gophernotes/gophernotes.go

212 lines
6.0 KiB
Go
Raw Permalink Normal View History

package main
import (
2016-01-23 13:48:33 -06:00
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
2016-07-24 06:15:04 -05:00
2016-07-29 06:10:43 -05:00
zmq "github.com/alecthomas/gozmq"
2016-07-29 07:11:58 -05:00
"github.com/pkg/errors"
)
var logger *log.Logger
2016-01-23 13:48:33 -06:00
// ConnectionInfo stores the contents of the kernel connection file created by Jupyter.
type ConnectionInfo struct {
2016-07-25 06:15:06 -05:00
SignatureScheme string `json:"signature_scheme"`
Transport string `json:"transport"`
StdinPort int `json:"stdin_port"`
ControlPort int `json:"control_port"`
IOPubPort int `json:"iopub_port"`
HBPort int `json:"hb_port"`
ShellPort int `json:"shell_port"`
Key string `json:"key"`
IP string `json:"ip"`
}
2016-01-23 13:48:33 -06:00
// SocketGroup holds the sockets needed to communicate with the kernel, and
// the key for message signing.
type SocketGroup struct {
2016-07-24 06:15:04 -05:00
ShellSocket *zmq.Socket
ControlSocket *zmq.Socket
StdinSocket *zmq.Socket
IOPubSocket *zmq.Socket
Key []byte
}
// PrepareSockets sets up the ZMQ sockets through which the kernel will communicate.
2016-07-29 07:11:58 -05:00
func PrepareSockets(connInfo ConnectionInfo) (SocketGroup, error) {
2016-01-23 13:48:33 -06:00
2016-07-29 07:11:58 -05:00
// Initialize the Socket Group.
context, sg, err := createSockets()
if err != nil {
return sg, errors.Wrap(err, "Could not initialize context and Socket Group")
}
2016-01-23 13:48:33 -06:00
2016-07-29 07:11:58 -05:00
// Bind the sockets.
2016-07-24 06:15:04 -05:00
address := fmt.Sprintf("%v://%v:%%v", connInfo.Transport, connInfo.IP)
sg.ShellSocket.Bind(fmt.Sprintf(address, connInfo.ShellPort))
sg.ControlSocket.Bind(fmt.Sprintf(address, connInfo.ControlPort))
sg.StdinSocket.Bind(fmt.Sprintf(address, connInfo.StdinPort))
sg.IOPubSocket.Bind(fmt.Sprintf(address, connInfo.IOPubPort))
2016-01-23 13:48:33 -06:00
// Message signing key
2016-07-24 06:15:04 -05:00
sg.Key = []byte(connInfo.Key)
2016-01-23 13:48:33 -06:00
// Start the heartbeat device
2016-07-29 07:11:58 -05:00
HBSocket, err := context.NewSocket(zmq.REP)
if err != nil {
return sg, errors.Wrap(err, "Could not get the Heartbeat device socket")
}
2016-07-24 06:15:04 -05:00
HBSocket.Bind(fmt.Sprintf(address, connInfo.HBPort))
go zmq.Device(zmq.FORWARDER, HBSocket, HBSocket)
2016-01-23 13:48:33 -06:00
2016-07-29 07:11:58 -05:00
return sg, nil
}
// createSockets initializes the sockets for the socket group based on values from zmq.
func createSockets() (*zmq.Context, SocketGroup, error) {
context, err := zmq.NewContext()
if err != nil {
return context, SocketGroup{}, errors.Wrap(err, "Could not create zmq Context")
}
var sg SocketGroup
sg.ShellSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Shell Socket")
}
sg.ControlSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Control Socket")
}
sg.StdinSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Stdin Socket")
}
sg.IOPubSocket, err = context.NewSocket(zmq.PUB)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get IOPub Socket")
}
return context, sg, nil
}
// HandleShellMsg responds to a message on the shell ROUTER socket.
func HandleShellMsg(receipt MsgReceipt) {
2016-01-24 00:16:23 -06:00
switch receipt.Msg.Header.MsgType {
2016-01-23 13:48:33 -06:00
case "kernel_info_request":
SendKernelInfo(receipt)
case "execute_request":
HandleExecuteRequest(receipt)
case "shutdown_request":
HandleShutdownRequest(receipt)
default:
2016-01-24 00:16:23 -06:00
logger.Println("Unhandled shell message:", receipt.Msg.Header.MsgType)
2016-01-23 13:48:33 -06:00
}
}
// KernelInfo holds information about the igo kernel, for kernel_info_reply messages.
type KernelInfo struct {
2016-07-24 06:15:04 -05:00
ProtocolVersion []int `json:"protocol_version"`
Language string `json:"language"`
}
// KernelStatus holds a kernel state, for status broadcast messages.
type KernelStatus struct {
2016-01-23 13:48:33 -06:00
ExecutionState string `json:"execution_state"`
}
2016-01-23 13:48:33 -06:00
// SendKernelInfo sends a kernel_info_reply message.
func SendKernelInfo(receipt MsgReceipt) {
2016-01-23 13:48:33 -06:00
reply := NewMsg("kernel_info_reply", receipt.Msg)
reply.Content = KernelInfo{[]int{4, 0}, "go"}
2016-07-24 06:15:04 -05:00
receipt.SendResponse(receipt.Sockets.ShellSocket, reply)
}
2016-01-24 00:16:23 -06:00
// ShutdownReply encodes a boolean indication of stutdown/restart
type ShutdownReply struct {
2016-01-23 13:48:33 -06:00
Restart bool `json:"restart"`
}
2016-01-23 13:48:33 -06:00
// HandleShutdownRequest sends a "shutdown" message
func HandleShutdownRequest(receipt MsgReceipt) {
2016-01-23 13:48:33 -06:00
reply := NewMsg("shutdown_reply", receipt.Msg)
content := receipt.Msg.Content.(map[string]interface{})
restart := content["restart"].(bool)
reply.Content = ShutdownReply{restart}
2016-07-24 06:15:04 -05:00
receipt.SendResponse(receipt.Sockets.ShellSocket, reply)
2016-01-23 13:48:33 -06:00
logger.Println("Shutting down in response to shutdown_request")
os.Exit(0)
}
2016-07-29 16:52:27 -05:00
// RunKernel is the main entry point to start the kernel.
2016-07-24 06:15:04 -05:00
func RunKernel(connectionFile string, logwriter io.Writer) {
2016-01-23 13:48:33 -06:00
logger = log.New(logwriter, "gophernotes ", log.LstdFlags)
2016-07-29 16:52:27 -05:00
// Set up the "Session" with the replpkg.
2016-01-23 13:48:33 -06:00
SetupExecutionEnvironment()
2016-07-24 06:15:04 -05:00
var connInfo ConnectionInfo
bs, err := ioutil.ReadFile(connectionFile)
2016-01-23 13:48:33 -06:00
if err != nil {
log.Fatalln(err)
}
2016-07-29 07:11:58 -05:00
if err = json.Unmarshal(bs, &connInfo); err != nil {
2016-01-23 13:48:33 -06:00
log.Fatalln(err)
}
2016-07-24 06:15:04 -05:00
logger.Printf("%+v\n", connInfo)
2016-01-23 13:48:33 -06:00
2016-07-29 16:52:27 -05:00
// Set up the ZMQ sockets through which the kernel will communicate.
2016-07-29 07:11:58 -05:00
sockets, err := PrepareSockets(connInfo)
if err != nil {
log.Fatalln(err)
}
2016-01-23 13:48:33 -06:00
pi := zmq.PollItems{
2016-07-24 06:15:04 -05:00
zmq.PollItem{Socket: sockets.ShellSocket, Events: zmq.POLLIN},
zmq.PollItem{Socket: sockets.StdinSocket, Events: zmq.POLLIN},
zmq.PollItem{Socket: sockets.ControlSocket, Events: zmq.POLLIN},
2016-01-23 13:48:33 -06:00
}
2016-07-29 16:52:27 -05:00
// Start a message receiving loop.
2016-01-23 13:48:33 -06:00
var msgparts [][]byte
for {
2016-07-29 07:11:58 -05:00
if _, err = zmq.Poll(pi, -1); err != nil {
2016-01-23 13:48:33 -06:00
log.Fatalln(err)
}
switch {
case pi[0].REvents&zmq.POLLIN != 0: // shell socket
msgparts, _ = pi[0].Socket.RecvMultipart(0)
msg, ids, err := WireMsgToComposedMsg(msgparts, sockets.Key)
if err != nil {
2016-07-29 16:52:27 -05:00
log.Println(err)
2016-01-23 13:48:33 -06:00
return
}
HandleShellMsg(MsgReceipt{msg, ids, sockets})
case pi[1].REvents&zmq.POLLIN != 0: // stdin socket - not implemented.
pi[1].Socket.RecvMultipart(0)
case pi[2].REvents&zmq.POLLIN != 0: // control socket - treat like shell socket.
2016-07-29 16:52:27 -05:00
msgparts, err = pi[2].Socket.RecvMultipart(0)
if err != nil {
log.Println(err)
return
}
2016-01-23 13:48:33 -06:00
msg, ids, err := WireMsgToComposedMsg(msgparts, sockets.Key)
if err != nil {
2016-07-29 16:52:27 -05:00
log.Println(err)
2016-01-23 13:48:33 -06:00
return
}
HandleShellMsg(MsgReceipt{msg, ids, sockets})
}
}
}