package main import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "io/ioutil" "log" "os" "os/exec" "runtime" "strings" "sync" "time" "github.com/go-zeromq/zmq4" "github.com/cosmos72/gomacro/ast2" "github.com/cosmos72/gomacro/base" basereflect "github.com/cosmos72/gomacro/base/reflect" interp "github.com/cosmos72/gomacro/fast" mp "github.com/cosmos72/gomacro/go/parser" "github.com/cosmos72/gomacro/xreflect" // compile and link files generated in imports/ _ "github.com/gopherdata/gophernotes/imports" ) // ExecCounter is incremented each time we run user code in the notebook. var ExecCounter int // ConnectionInfo stores the contents of the kernel connection // file created by Jupyter. type ConnectionInfo struct { SignatureScheme string `json:"signature_scheme"` Transport string `json:"transport"` StdinPort int `json:"stdin_port"` ControlPort int `json:"control_port"` IOPubPort int `json:"iopub_port"` HBPort int `json:"hb_port"` ShellPort int `json:"shell_port"` Key string `json:"key"` IP string `json:"ip"` } // Socket wraps a zmq socket with a lock which should be used to control write access. type Socket struct { Socket zmq4.Socket Lock *sync.Mutex } // SocketGroup holds the sockets needed to communicate with the kernel, // and the key for message signing. type SocketGroup struct { ShellSocket Socket ControlSocket Socket StdinSocket Socket IOPubSocket Socket HBSocket Socket Key []byte } // KernelLanguageInfo holds information about the language that this kernel executes code in. type kernelLanguageInfo struct { Name string `json:"name"` Version string `json:"version"` MIMEType string `json:"mimetype"` FileExtension string `json:"file_extension"` PygmentsLexer string `json:"pygments_lexer"` CodeMirrorMode string `json:"codemirror_mode"` NBConvertExporter string `json:"nbconvert_exporter"` } // HelpLink stores data to be displayed in the help menu of the notebook. type helpLink struct { Text string `json:"text"` URL string `json:"url"` } // KernelInfo holds information about the igo kernel, for kernel_info_reply messages. type kernelInfo struct { ProtocolVersion string `json:"protocol_version"` Implementation string `json:"implementation"` ImplementationVersion string `json:"implementation_version"` LanguageInfo kernelLanguageInfo `json:"language_info"` Banner string `json:"banner"` HelpLinks []helpLink `json:"help_links"` } // shutdownReply encodes a boolean indication of shutdown/restart. type shutdownReply struct { Restart bool `json:"restart"` } // isCompleteReply holds information about the statement is complete or not, for is_complete_reply messages. type isCompleteReply struct { Status string `json:"status"` Indent string `json:"indent"` } const ( kernelStarting = "starting" kernelBusy = "busy" kernelIdle = "idle" ) // RunWithSocket invokes the `run` function after acquiring the `Socket.Lock` and releases the lock when done. func (s *Socket) RunWithSocket(run func(socket zmq4.Socket) error) error { s.Lock.Lock() defer s.Lock.Unlock() return run(s.Socket) } type Kernel struct { ir *interp.Interp display *interp.Import // map name -> HTMLer, JSONer, Renderer... // used to convert interpreted types to one of these interfaces render map[string]xreflect.Type } // runKernel is the main entry point to start the kernel. func runKernel(connectionFile string) { // Create a new interpreter for evaluating notebook code. ir := interp.New() // Throw out the error/warning messages that gomacro outputs writes to these streams. ir.Comp.Stdout = ioutil.Discard ir.Comp.Stderr = ioutil.Discard // Inject the "display" package to render HTML, JSON, PNG, JPEG, SVG... from interpreted code // maybe a dot-import is easier to use? display := importPackage(ir, "display", "display") // Inject the stub "Display" function. declare a variable // instead of a function, because we want to later change // its value to the closure that holds a reference to msgReceipt ir.DeclVar("Display", nil, stubDisplay) // Parse the connection info. var connInfo ConnectionInfo connData, err := ioutil.ReadFile(connectionFile) if err != nil { log.Fatal(err) } if err = json.Unmarshal(connData, &connInfo); err != nil { log.Fatal(err) } // Set up the ZMQ sockets through which the kernel will communicate. sockets, err := prepareSockets(connInfo) if err != nil { log.Fatal(err) } // TODO connect all channel handlers to a WaitGroup to ensure shutdown before returning from runKernel. // Start up the heartbeat handler. startHeartbeat(sockets.HBSocket, &sync.WaitGroup{}) // TODO gracefully shutdown the heartbeat handler on kernel shutdown by closing the chan returned by startHeartbeat. type msgType struct { Msg zmq4.Msg Err error } var ( shell = make(chan msgType) stdin = make(chan msgType) ctl = make(chan msgType) quit = make(chan int) ) defer close(quit) poll := func(msgs chan msgType, sck zmq4.Socket) { defer close(msgs) for { msg, err := sck.Recv() select { case msgs <- msgType{Msg: msg, Err: err}: case <-quit: return } } } go poll(shell, sockets.ShellSocket.Socket) go poll(stdin, sockets.StdinSocket.Socket) go poll(ctl, sockets.ControlSocket.Socket) kernel := Kernel{ ir, display, nil, } kernel.initRenderers() // Start a message receiving loop. for { select { case v := <-shell: // Handle shell messages. if v.Err != nil { log.Println(v.Err) continue } msg, ids, err := WireMsgToComposedMsg(v.Msg.Frames, sockets.Key) if err != nil { log.Println(err) return } kernel.handleShellMsg(msgReceipt{msg, ids, sockets}) case <-stdin: // TODO Handle stdin socket. continue case v := <-ctl: if v.Err != nil { log.Println(v.Err) return } msg, ids, err := WireMsgToComposedMsg(v.Msg.Frames, sockets.Key) if err != nil { log.Println(err) return } kernel.handleShellMsg(msgReceipt{msg, ids, sockets}) } } } func importPackage(ir *interp.Interp, path string, alias string) *interp.Import { packages, err := ir.ImportPackagesOrError( map[string]interp.PackageName{ path: interp.PackageName(alias), }) if err != nil { log.Print(err) } return packages[path] } // prepareSockets sets up the ZMQ sockets through which the kernel // will communicate. func prepareSockets(connInfo ConnectionInfo) (SocketGroup, error) { // Initialize the socket group. var ( sg SocketGroup err error ctx = context.Background() ) // Create the shell socket, a request-reply socket that may receive messages from multiple frontend for // code execution, introspection, auto-completion, etc. sg.ShellSocket.Socket = zmq4.NewRouter(ctx) sg.ShellSocket.Lock = &sync.Mutex{} // Create the control socket. This socket is a duplicate of the shell socket where messages on this channel // should jump ahead of queued messages on the shell socket. sg.ControlSocket.Socket = zmq4.NewRouter(ctx) sg.ControlSocket.Lock = &sync.Mutex{} // Create the stdin socket, a request-reply socket used to request user input from a front-end. This is analogous // to a standard input stream. sg.StdinSocket.Socket = zmq4.NewRouter(ctx) sg.StdinSocket.Lock = &sync.Mutex{} // Create the iopub socket, a publisher for broadcasting data like stdout/stderr output, displaying execution // results or errors, kernel status, etc. to connected subscribers. sg.IOPubSocket.Socket = zmq4.NewPub(ctx) sg.IOPubSocket.Lock = &sync.Mutex{} // Create the heartbeat socket, a request-reply socket that only allows alternating recv-send (request-reply) // calls. It should echo the byte strings it receives to let the requester know the kernel is still alive. sg.HBSocket.Socket = zmq4.NewRep(ctx) sg.HBSocket.Lock = &sync.Mutex{} // Bind the sockets. address := fmt.Sprintf("%v://%v:%%v", connInfo.Transport, connInfo.IP) err = sg.ShellSocket.Socket.Listen(fmt.Sprintf(address, connInfo.ShellPort)) if err != nil { return sg, fmt.Errorf("could not listen on shell-socket: %w", err) } err = sg.ControlSocket.Socket.Listen(fmt.Sprintf(address, connInfo.ControlPort)) if err != nil { return sg, fmt.Errorf("could not listen on control-socket: %w", err) } err = sg.StdinSocket.Socket.Listen(fmt.Sprintf(address, connInfo.StdinPort)) if err != nil { return sg, fmt.Errorf("could not listen on stdin-socket: %w", err) } err = sg.IOPubSocket.Socket.Listen(fmt.Sprintf(address, connInfo.IOPubPort)) if err != nil { return sg, fmt.Errorf("could not listen on iopub-socket: %w", err) } err = sg.HBSocket.Socket.Listen(fmt.Sprintf(address, connInfo.HBPort)) if err != nil { return sg, fmt.Errorf("could not listen on hbeat-socket: %w", err) } // Set the message signing key. sg.Key = []byte(connInfo.Key) return sg, nil } // handleShellMsg responds to a message on the shell ROUTER socket. func (kernel *Kernel) handleShellMsg(receipt msgReceipt) { // Tell the front-end that the kernel is working and when finished notify the // front-end that the kernel is idle again. if err := receipt.PublishKernelStatus(kernelBusy); err != nil { log.Printf("Error publishing kernel status 'busy': %v\n", err) } defer func() { if err := receipt.PublishKernelStatus(kernelIdle); err != nil { log.Printf("Error publishing kernel status 'idle': %v\n", err) } }() ir := kernel.ir switch receipt.Msg.Header.MsgType { case "kernel_info_request": if err := sendKernelInfo(receipt); err != nil { log.Fatal(err) } case "is_complete_request": if err := kernel.handleIsCompleteRequest(receipt); err != nil { log.Fatal(err) } case "complete_request": if err := handleCompleteRequest(ir, receipt); err != nil { log.Fatal(err) } case "execute_request": if err := kernel.handleExecuteRequest(receipt); err != nil { log.Fatal(err) } case "shutdown_request": handleShutdownRequest(receipt) default: log.Println("Unhandled shell message: ", receipt.Msg.Header.MsgType) } } // sendKernelInfo sends a kernel_info_reply message. func sendKernelInfo(receipt msgReceipt) error { return receipt.Reply("kernel_info_reply", kernelInfo{ ProtocolVersion: ProtocolVersion, Implementation: "gophernotes", ImplementationVersion: Version, Banner: fmt.Sprintf("Go kernel: gophernotes - v%s", Version), LanguageInfo: kernelLanguageInfo{ Name: "go", Version: runtime.Version(), FileExtension: ".go", }, HelpLinks: []helpLink{ {Text: "Go", URL: "https://golang.org/"}, {Text: "gophernotes", URL: "https://github.com/gopherdata/gophernotes"}, }, }, ) } // checkComplete checks whether the `code` is complete or not. func checkComplete(code string, ir *interp.Interp) (status, indent string) { status, indent = "unknown", "" if len(code) == 0 { return status, indent } readline := base.MakeBufReadline(bufio.NewReader(strings.NewReader(code))) for { _, _, err := base.ReadMultiline(readline, base.ReadOptions(0), "") if err == io.EOF { return "complete", indent } else if err == io.ErrUnexpectedEOF { return "incomplete", indent } else if err != nil { return "invalid", indent } } var parser mp.Parser g := ir.Comp parser.Configure(g.ParserMode, g.MacroChar) parser.Init(g.Fileset, g.Filepath, g.Line, []byte(code)) _, err := parser.Parse() if err != nil { status = "invalid" } else { status = "complete" } return status, indent } // handleIsCompleteRequest sends a is_complete_reply message. func (kernel *Kernel) handleIsCompleteRequest(receipt msgReceipt) error { // Extract the data from the request. reqcontent := receipt.Msg.Content.(map[string]interface{}) code := reqcontent["code"].(string) status, indent := checkComplete(code, kernel.ir) return receipt.Reply("is_complete_reply", isCompleteReply{ Status: status, Indent: indent, }, ) } // handleExecuteRequest runs code from an execute_request method, // and sends the various reply messages. func (kernel *Kernel) handleExecuteRequest(receipt msgReceipt) error { // Extract the data from the request. reqcontent := receipt.Msg.Content.(map[string]interface{}) code := reqcontent["code"].(string) silent := reqcontent["silent"].(bool) if !silent { ExecCounter++ } // Prepare the map that will hold the reply content. content := make(map[string]interface{}) content["execution_count"] = ExecCounter // Tell the front-end what the kernel is about to execute. if err := receipt.PublishExecutionInput(ExecCounter, code); err != nil { log.Printf("Error publishing execution input: %v\n", err) } // Redirect the standard out from the REPL. oldStdout := os.Stdout rOut, wOut, err := os.Pipe() if err != nil { return err } os.Stdout = wOut // Redirect the standard error from the REPL. oldStderr := os.Stderr rErr, wErr, err := os.Pipe() if err != nil { return err } os.Stderr = wErr var writersWG sync.WaitGroup writersWG.Add(2) jupyterStdOut := JupyterStreamWriter{StreamStdout, &receipt} jupyterStdErr := JupyterStreamWriter{StreamStderr, &receipt} outerr := OutErr{&jupyterStdOut, &jupyterStdErr} // Forward all data written to stdout/stderr to the front-end. go func() { defer writersWG.Done() io.Copy(&jupyterStdOut, rOut) }() go func() { defer writersWG.Done() io.Copy(&jupyterStdErr, rErr) }() // inject the actual "Display" closure that displays multimedia data in Jupyter ir := kernel.ir displayPlace := ir.ValueOf("Display") displayPlace.Set(xreflect.ValueOf(receipt.PublishDisplayData)) defer func() { // remove the closure before returning displayPlace.Set(xreflect.ValueOf(stubDisplay)) }() // eval vals, types, executionErr := doEval(ir, outerr, code) // Close and restore the streams. wOut.Close() os.Stdout = oldStdout wErr.Close() os.Stderr = oldStderr // Wait for the writers to finish forwarding the data. writersWG.Wait() if executionErr == nil { // if the only non-nil value should be auto-rendered graphically, render it data := kernel.autoRenderResults(vals, types) content["status"] = "ok" content["user_expressions"] = make(map[string]string) if !silent && len(data.Data) != 0 { // Publish the result of the execution. if err := receipt.PublishExecutionResult(ExecCounter, data); err != nil { log.Printf("Error publishing execution result: %v\n", err) } } } else { content["status"] = "error" content["ename"] = "ERROR" content["evalue"] = executionErr.Error() content["traceback"] = nil if err := receipt.PublishExecutionError(executionErr.Error(), []string{executionErr.Error()}); err != nil { log.Printf("Error publishing execution error: %v\n", err) } } // Send the output back to the notebook. return receipt.Reply("execute_reply", content) } // doEval evaluates the code in the interpreter. This function captures an uncaught panic // as well as the values of the last statement/expression. func doEval(ir *interp.Interp, outerr OutErr, code string) (val []interface{}, typ []xreflect.Type, err error) { // Capture a panic from the evaluation if one occurs and store it in the `err` return parameter. defer func() { if r := recover(); r != nil { var ok bool if err, ok = r.(error); !ok { err = errors.New(fmt.Sprint(r)) } } }() code = evalSpecialCommands(ir, outerr, code) // Prepare and perform the multiline evaluation. compiler := ir.Comp // Don't show the gomacro prompt. compiler.Options &^= base.OptShowPrompt // Don't swallow panics as they are recovered above and handled with a Jupyter `error` message instead. compiler.Options &^= base.OptTrapPanic // Reset the error line so that error messages correspond to the lines from the cell. compiler.Line = 0 // Parse the input code (and don't perform gomacro's macroexpansion). // These may panic but this will be recovered by the deferred recover() above so that the error // may be returned instead. nodes := compiler.ParseBytes([]byte(code)) srcAst := ast2.AnyToAst(nodes, "doEval") // If there is no srcAst then we must be evaluating nothing. The result must be nil then. if srcAst == nil { return nil, nil, nil } // Compile the ast. compiledSrc := ir.CompileAst(srcAst) // Evaluate the code. results, types := ir.RunExpr(compiledSrc) // Convert results from xreflect.Value to interface{} values := make([]interface{}, len(results)) for i, result := range results { values[i] = basereflect.ValueInterface(result) } return values, types, nil } // handleShutdownRequest sends a "shutdown" message. func handleShutdownRequest(receipt msgReceipt) { content := receipt.Msg.Content.(map[string]interface{}) restart := content["restart"].(bool) reply := shutdownReply{ Restart: restart, } if err := receipt.Reply("shutdown_reply", reply); err != nil { log.Fatal(err) } log.Println("Shutting down in response to shutdown_request") os.Exit(0) } // startHeartbeat starts a go-routine for handling heartbeat ping messages sent over the given `hbSocket`. The `wg`'s // `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel // can be closed. func startHeartbeat(hbSocket Socket, wg *sync.WaitGroup) (shutdown chan struct{}) { quit := make(chan struct{}) // Start the handler that will echo any received messages back to the sender. wg.Add(1) go func() { defer wg.Done() type msgType struct { Msg zmq4.Msg Err error } msgs := make(chan msgType) go func() { defer close(msgs) for { msg, err := hbSocket.Socket.Recv() select { case msgs <- msgType{msg, err}: case <-quit: return } } }() timeout := time.NewTimer(500 * time.Second) defer timeout.Stop() for { timeout.Reset(500 * time.Second) select { case <-quit: return case <-timeout.C: continue case v := <-msgs: hbSocket.RunWithSocket(func(echo zmq4.Socket) error { if v.Err != nil { log.Fatalf("Error reading heartbeat ping bytes: %v\n", v.Err) return v.Err } // Send the received byte string back to let the front-end know that the kernel is alive. if err := echo.Send(v.Msg); err != nil { log.Printf("Error sending heartbeat pong bytes: %b\n", err) return err } return nil }) } } }() return quit } // find and execute special commands in code, remove them from returned string func evalSpecialCommands(ir *interp.Interp, outerr OutErr, code string) string { lines := strings.Split(code, "\n") stop := false for i, line := range lines { line = strings.TrimSpace(line) if len(line) != 0 { switch line[0] { case '%': evalSpecialCommand(ir, outerr, line) lines[i] = "" case '$': evalShellCommand(ir, outerr, line) lines[i] = "" default: // if a line is NOT a special command, // stop processing special commands stop = true } } if stop { break } } return strings.Join(lines, "\n") } // execute special command. line must start with '%' func evalSpecialCommand(ir *interp.Interp, outerr OutErr, line string) { const help string = ` available special commands (%): %cd [path] %go111module {on|off} %help execute shell commands ($): $command [args...] example: $ls -l ` args := strings.SplitN(line, " ", 2) cmd := args[0] arg := "" if len(args) > 1 { arg = args[1] } switch cmd { case "%cd": if arg == "" { home, err := os.UserHomeDir() if err != nil { panic(fmt.Errorf("error getting user home directory: %v", err)) } arg = home } err := os.Chdir(arg) if err != nil { panic(fmt.Errorf("error setting current directory to %q: %v", arg, err)) } case "%go111module": if arg == "on" { ir.Comp.CompGlobals.Options |= base.OptModuleImport } else if arg == "off" { ir.Comp.CompGlobals.Options &^= base.OptModuleImport } else { panic(fmt.Errorf("special command %s: expecting a single argument 'on' or 'off', found: %q", cmd, arg)) } case "%help": outerr.out.Write([]byte(help)) default: panic(fmt.Errorf("unknown special command: %q\n%s", line, help)) } } // execute shell command. line must start with '$' func evalShellCommand(ir *interp.Interp, outerr OutErr, line string) { args := strings.Fields(line[1:]) if len(args) <= 0 { return } var writersWG sync.WaitGroup writersWG.Add(2) cmd := exec.Command(args[0], args[1:]...) stdout, err := cmd.StdoutPipe() if err != nil { panic(fmt.Errorf("Command.StdoutPipe() failed: %v", err)) } stderr, err := cmd.StderrPipe() if err != nil { panic(fmt.Errorf("Command.StderrPipe() failed: %v", err)) } go func() { defer writersWG.Done() io.Copy(outerr.out, stdout) }() go func() { defer writersWG.Done() io.Copy(outerr.err, stderr) }() err = cmd.Start() if err != nil { panic(fmt.Errorf("error starting command '%s': %v", line[1:], err)) } err = cmd.Wait() if err != nil { panic(fmt.Errorf("error waiting for command '%s': %v", line[1:], err)) } writersWG.Wait() }