Merge pull request #534 from GaloisInc/jhx-dynamic

Add withDynamic to IPython.Kernel.ZeroMQ
This commit is contained in:
Andrew Gibiansky 2015-07-22 23:43:55 -07:00
commit 4afec8b0c3

View File

@ -5,18 +5,28 @@
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython, replacing it
-- instead with a Haskell Channel based interface. The `serveProfile` function takes a IPython
-- profile specification and returns the channel interface to use.
module IHaskell.IPython.ZeroMQ (ZeroMQInterface(..), ZeroMQStdin(..), serveProfile, serveStdin) where
module IHaskell.IPython.ZeroMQ
( ZeroMQInterface(..)
, ZeroMQStdin(..)
, serveProfile
, serveStdin
, ZeroMQEphemeralPorts
, withEphemeralPorts
) where
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Aeson
import qualified Data.ByteString.Lazy as LBS
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as Char
import Control.Concurrent
import Control.Monad
import System.IO.Unsafe
import Data.Aeson (encode)
import System.ZMQ4 hiding (stdin)
import Data.Char
import Data.Digest.Pure.SHA as SHA
import Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text
import System.ZMQ4 as ZMQ4 hiding (stdin)
import Text.Read (readMaybe)
import IHaskell.IPython.Types
import IHaskell.IPython.Message.Parser
@ -27,7 +37,7 @@ import IHaskell.IPython.Message.Writer
-- should functionally serve as high-level sockets which speak Messages instead of ByteStrings.
data ZeroMQInterface =
Channels
{
{
-- | A channel populated with requests from the frontend.
shellRequestChannel :: Chan Message
-- | Writing to this channel causes a reply to be sent to the frontend.
@ -50,6 +60,22 @@ data ZeroMQStdin =
, stdinReplyChannel :: Chan Message
}
-- | Create new channels for a ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface key = do
shellReqChan <- newChan
shellRepChan <- newChan
controlReqChan <- dupChan shellReqChan
controlRepChan <- dupChan shellRepChan
iopubChan <- newChan
return $! Channels { shellRequestChannel = shellReqChan
, shellReplyChannel = shellRepChan
, controlRequestChannel = controlReqChan
, controlReplyChannel = controlRepChan
, iopubChannel = iopubChan
, hmacKey = key
}
-- | Start responding on all ZeroMQ channels used to communicate with IPython | via the provided
-- profile. Return a set of channels which can be used to | communicate with IPython in a more
-- structured manner.
@ -57,14 +83,7 @@ serveProfile :: Profile -- ^ The profile specifying which ports and t
-> Bool -- ^ Print debug output
-> IO ZeroMQInterface -- ^ The Message-channel based interface to the sockets.
serveProfile profile debug = do
-- Create all channels which will be used for higher level communication.
shellReqChan <- newChan
shellRepChan <- newChan
controlReqChan <- dupChan shellReqChan
controlRepChan <- dupChan shellRepChan
iopubChan <- newChan
let channels = Channels shellReqChan shellRepChan controlReqChan controlRepChan iopubChan
(signatureKey profile)
channels <- newZeroMQInterface (signatureKey profile)
-- Create the context in a separate thread that never finishes. If withContext or withSocket
-- complete, the context or socket become invalid.
@ -81,6 +100,80 @@ serveProfile profile debug = do
return channels
-- | Describes ports used when creating an ephemeral ZeroMQ session.
-- Used to generate the ipython JSON config file.
data ZeroMQEphemeralPorts
= ZeroMQEphemeralPorts { ephHbPort :: !Port
, ephControlPort :: !Port
, ephShellPort :: !Port
, ephIOPubPort :: !Port
, ephSignatureKey :: !ByteString
}
instance ToJSON ZeroMQEphemeralPorts where
toJSON ports =
object [ "ip" .= ("127.0.0.1" :: String)
, "transport" .= TCP
, "control_port" .= ephControlPort ports
, "hb_port" .= ephHbPort ports
, "shell_port" .= ephShellPort ports
, "iopub_port" .= ephIOPubPort ports
, "key" .= Text.decodeUtf8 (ephSignatureKey ports)
]
parsePort :: String -> Maybe Int
parsePort s = readMaybe num
where num = reverse (takeWhile isNumber (reverse s))
bindLocalEphemeralPort :: Socket a -> IO Int
bindLocalEphemeralPort socket = do
bind socket $ "tcp://127.0.0.1:*"
endpointString <- lastEndpoint socket
case parsePort endpointString of
Nothing ->
fail $ "internalError: IHaskell.IPython.ZeroMQ.bindLocalEphemeralPort encountered a port index that could not be interpreted as an int."
Just endpointIndex ->
return endpointIndex
-- | Run session for communicating with an IPython instance on ephemerally allocated
-- ZMQ4 sockets. The sockets will be closed when the callback returns.
withEphemeralPorts :: ByteString
-- ^ HMAC encryption key
-> Bool
-- ^ Print debug output
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
-- ^ Callback that takes the interface to the sockets.
-> IO a
withEphemeralPorts key debug callback = do
channels <- newZeroMQInterface key
-- Create the ZMQ4 context
withContext $ \context -> do
-- Create the sockets to communicate with.
withSocket context Rep $ \heartbeatSocket -> do
withSocket context Router $ \controlportSocket -> do
withSocket context Router $ \shellportSocket -> do
withSocket context Pub $ \iopubSocket -> do
-- Bind each socket to a local port, getting the port chosen.
hbPort <- bindLocalEphemeralPort heartbeatSocket
controlPort <- bindLocalEphemeralPort controlportSocket
shellPort <- bindLocalEphemeralPort shellportSocket
iopubPort <- bindLocalEphemeralPort iopubSocket
-- Create object to store ephemeral ports
let ports = ZeroMQEphemeralPorts { ephHbPort = hbPort
, ephControlPort = controlPort
, ephShellPort = shellPort
, ephIOPubPort = iopubPort
, ephSignatureKey = key
}
-- Launch actions to listen to communicate between channels and cockets.
_ <- forkIO $ forever $ heartbeat channels heartbeatSocket
_ <- forkIO $ forever $ control debug channels controlportSocket
_ <- forkIO $ forever $ shell debug channels shellportSocket
_ <- forkIO $ checkedIOpub debug channels iopubSocket
-- Run callback function; provide it with both ports and channels.
callback ports channels
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin profile = do
reqChannel <- newChan
@ -152,6 +245,31 @@ iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub debug channels socket =
readChan (iopubChannel channels) >>= sendMessage debug (hmacKey channels) socket
-- | Attempt to send a message along the socket, returning true if successful.
trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage nm debug hmacKey socket message = do
let zmqErrorHandler :: ZMQError -> IO Bool
zmqErrorHandler e
-- Ignore errors if we cannot send.
-- We may want to forward this to the thread that tried put the message
-- in the Chan initially.
| errno e == 38 = return False
| otherwise = throwIO e
(sendMessage debug hmacKey socket message >> return True)
`catch` zmqErrorHandler
-- | Send messages via the iopub channel.
-- This reads messages from the ZeroMQ iopub interface
-- channel and then writes the messages to the socket.
-- This is a checked implementation which will stop if the socket is closed.
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub debug channels socket = do
msg <- readChan (iopubChannel channels)
cont <- trySendMessage "io" debug (hmacKey channels) socket msg
when cont $
checkedIOpub debug channels socket
-- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
receiveMessage debug socket = do