mirror of
https://github.com/IHaskell/IHaskell.git
synced 2025-04-16 11:26:08 +00:00
Add withDynamic to IPython.Kernel.ZeroMQ
This commit is contained in:
parent
1c2265b64b
commit
54f6e8a3ad
@ -5,18 +5,28 @@
|
||||
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython, replacing it
|
||||
-- instead with a Haskell Channel based interface. The `serveProfile` function takes a IPython
|
||||
-- profile specification and returns the channel interface to use.
|
||||
module IHaskell.IPython.ZeroMQ (ZeroMQInterface(..), ZeroMQStdin(..), serveProfile, serveStdin) where
|
||||
module IHaskell.IPython.ZeroMQ
|
||||
( ZeroMQInterface(..)
|
||||
, ZeroMQStdin(..)
|
||||
, serveProfile
|
||||
, serveStdin
|
||||
, ZeroMQDynamicPorts(..)
|
||||
, withDynamic
|
||||
) where
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Exception
|
||||
import Control.Monad
|
||||
import Data.Aeson
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Char8 as Char
|
||||
import Control.Concurrent
|
||||
import Control.Monad
|
||||
import System.IO.Unsafe
|
||||
import Data.Aeson (encode)
|
||||
import System.ZMQ4 hiding (stdin)
|
||||
import Data.Char
|
||||
import Data.Digest.Pure.SHA as SHA
|
||||
import Data.Monoid ((<>))
|
||||
import qualified Data.Text.Encoding as Text
|
||||
import System.IO.Unsafe
|
||||
import System.ZMQ4 hiding (stdin)
|
||||
|
||||
import IHaskell.IPython.Types
|
||||
import IHaskell.IPython.Message.Parser
|
||||
@ -27,7 +37,7 @@ import IHaskell.IPython.Message.Writer
|
||||
-- should functionally serve as high-level sockets which speak Messages instead of ByteStrings.
|
||||
data ZeroMQInterface =
|
||||
Channels
|
||||
{
|
||||
{
|
||||
-- | A channel populated with requests from the frontend.
|
||||
shellRequestChannel :: Chan Message
|
||||
-- | Writing to this channel causes a reply to be sent to the frontend.
|
||||
@ -81,6 +91,79 @@ serveProfile profile debug = do
|
||||
|
||||
return channels
|
||||
|
||||
data ZeroMQDynamicPorts
|
||||
= ZeroMQDynamicPorts { dynHbPort :: !Port
|
||||
, dynControlPort :: !Port
|
||||
, dynShellPort :: !Port
|
||||
, dynIOPubPort :: !Port
|
||||
, dynSignatureKey :: !ByteString
|
||||
}
|
||||
|
||||
instance ToJSON ZeroMQDynamicPorts where
|
||||
toJSON ports =
|
||||
object [ "ip" .= ("127.0.0.1" :: String)
|
||||
, "transport" .= TCP
|
||||
, "control_port" .= dynControlPort ports
|
||||
, "hb_port" .= dynHbPort ports
|
||||
, "shell_port" .= dynShellPort ports
|
||||
, "iopub_port" .= dynIOPubPort ports
|
||||
, "key" .= Text.decodeUtf8 (dynSignatureKey ports)
|
||||
]
|
||||
|
||||
parsePort :: String -> Int
|
||||
parsePort s = read num
|
||||
where num = reverse (takeWhile isNumber (reverse s))
|
||||
|
||||
bindLocalDynamicPort :: Socket a -> IO Int
|
||||
bindLocalDynamicPort socket = do
|
||||
bind socket $ "tcp://127.0.0.1:*"
|
||||
parsePort <$> lastEndpoint socket
|
||||
|
||||
-- | Start responding on all ZeroMQ channels used to communicate with IPython
|
||||
-- | with dynamic ports.
|
||||
-- Profide the callback with the dynamic ports chosen and a ZeroMQInterface.
|
||||
withDynamic :: ByteString
|
||||
-- ^ HMAC encryption key
|
||||
-> Bool -- ^ Print debug output
|
||||
-> (ZeroMQDynamicPorts -> ZeroMQInterface -> IO a)
|
||||
-- ^ Callback that takes the interface to the sockets.
|
||||
-> IO a
|
||||
withDynamic key debug callback = do
|
||||
-- Create all channels which will be used for higher level communication.
|
||||
shellReqChan <- newChan
|
||||
shellRepChan <- newChan
|
||||
controlReqChan <- dupChan shellReqChan
|
||||
controlRepChan <- dupChan shellRepChan
|
||||
iopubChan <- newChan
|
||||
let channels = Channels shellReqChan shellRepChan controlReqChan controlRepChan iopubChan key
|
||||
|
||||
-- Create the context in a separate thread that never finishes. If withContext or withSocket
|
||||
-- complete, the context or socket become invalid.
|
||||
withContext $ \context -> do
|
||||
-- Serve on all sockets.
|
||||
withSocket context Rep $ \heartbeat_socket -> do
|
||||
withSocket context Router $ \controlport_socket -> do
|
||||
withSocket context Router $ \shellport_socket -> do
|
||||
withSocket context Pub $ \iopub_socket -> do
|
||||
|
||||
dyn_hb_port <- bindLocalDynamicPort heartbeat_socket
|
||||
dyn_control_port <- bindLocalDynamicPort controlport_socket
|
||||
dyn_shell_port <- bindLocalDynamicPort shellport_socket
|
||||
dyn_iopub_port <- bindLocalDynamicPort iopub_socket
|
||||
|
||||
_ <- forkIO $ forever $ heartbeat channels heartbeat_socket
|
||||
_ <- forkIO $ forever $ control debug channels controlport_socket
|
||||
_ <- forkIO $ forever $ shell debug channels shellport_socket
|
||||
_ <- forkIO $ forever $ checked_iopub debug channels iopub_socket
|
||||
|
||||
let ports = ZeroMQDynamicPorts { dynHbPort = dyn_hb_port
|
||||
, dynControlPort = dyn_control_port
|
||||
, dynShellPort = dyn_shell_port
|
||||
, dynIOPubPort = dyn_iopub_port
|
||||
, dynSignatureKey = key
|
||||
}
|
||||
callback ports channels
|
||||
|
||||
serveStdin :: Profile -> IO ZeroMQStdin
|
||||
serveStdin profile = do
|
||||
reqChannel <- newChan
|
||||
@ -152,6 +235,20 @@ iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
|
||||
iopub debug channels socket =
|
||||
readChan (iopubChannel channels) >>= sendMessage debug (hmacKey channels) socket
|
||||
|
||||
-- | Send messages via the iopub channel. | This reads messages from the ZeroMQ iopub interface
|
||||
-- channel | and then writes the messages to the socket.
|
||||
checked_iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
|
||||
checked_iopub debug channels socket = do
|
||||
msg <- readChan (iopubChannel channels)
|
||||
let error_handler :: ZMQError -> IO ()
|
||||
error_handler e
|
||||
-- Ignore errors if we cannot send.
|
||||
-- We may want to cascade this back to channel.
|
||||
| errno e == 38 = return ()
|
||||
| otherwise = throwIO e
|
||||
catch (sendMessage debug (hmacKey channels) socket msg)
|
||||
error_handler
|
||||
|
||||
-- | Receive and parse a message from a socket.
|
||||
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
|
||||
receiveMessage debug socket = do
|
||||
|
Loading…
x
Reference in New Issue
Block a user