mirror of
https://github.com/IHaskell/IHaskell.git
synced 2025-04-16 11:26:08 +00:00
Fix formatting
This commit is contained in:
parent
f77a7c3e74
commit
916e7ab7ea
@ -5,14 +5,14 @@
|
||||
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython, replacing it
|
||||
-- instead with a Haskell Channel based interface. The `serveProfile` function takes a IPython
|
||||
-- profile specification and returns the channel interface to use.
|
||||
module IHaskell.IPython.ZeroMQ
|
||||
( ZeroMQInterface(..)
|
||||
, ZeroMQStdin(..)
|
||||
, serveProfile
|
||||
, serveStdin
|
||||
, ZeroMQEphemeralPorts
|
||||
, withEphemeralPorts
|
||||
) where
|
||||
module IHaskell.IPython.ZeroMQ (
|
||||
ZeroMQInterface(..),
|
||||
ZeroMQStdin(..),
|
||||
serveProfile,
|
||||
serveStdin,
|
||||
ZeroMQEphemeralPorts,
|
||||
withEphemeralPorts,
|
||||
) where
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Exception
|
||||
@ -37,7 +37,7 @@ import IHaskell.IPython.Message.Writer
|
||||
-- should functionally serve as high-level sockets which speak Messages instead of ByteStrings.
|
||||
data ZeroMQInterface =
|
||||
Channels
|
||||
{
|
||||
{
|
||||
-- | A channel populated with requests from the frontend.
|
||||
shellRequestChannel :: Chan Message
|
||||
-- | Writing to this channel causes a reply to be sent to the frontend.
|
||||
@ -68,13 +68,14 @@ newZeroMQInterface key = do
|
||||
controlReqChan <- dupChan shellReqChan
|
||||
controlRepChan <- dupChan shellRepChan
|
||||
iopubChan <- newChan
|
||||
return $! Channels { shellRequestChannel = shellReqChan
|
||||
, shellReplyChannel = shellRepChan
|
||||
, controlRequestChannel = controlReqChan
|
||||
, controlReplyChannel = controlRepChan
|
||||
, iopubChannel = iopubChan
|
||||
, hmacKey = key
|
||||
}
|
||||
return $! Channels
|
||||
{ shellRequestChannel = shellReqChan
|
||||
, shellReplyChannel = shellRepChan
|
||||
, controlRequestChannel = controlReqChan
|
||||
, controlReplyChannel = controlRepChan
|
||||
, iopubChannel = iopubChan
|
||||
, hmacKey = key
|
||||
}
|
||||
|
||||
-- | Start responding on all ZeroMQ channels used to communicate with IPython | via the provided
|
||||
-- profile. Return a set of channels which can be used to | communicate with IPython in a more
|
||||
@ -100,34 +101,37 @@ serveProfile profile debug = do
|
||||
|
||||
return channels
|
||||
|
||||
-- | Describes ports used when creating an ephemeral ZeroMQ session.
|
||||
-- Used to generate the ipython JSON config file.
|
||||
data ZeroMQEphemeralPorts
|
||||
= ZeroMQEphemeralPorts { ephHbPort :: !Port
|
||||
, ephControlPort :: !Port
|
||||
, ephShellPort :: !Port
|
||||
, ephIOPubPort :: !Port
|
||||
, ephSignatureKey :: !ByteString
|
||||
}
|
||||
-- | Describes ports used when creating an ephemeral ZeroMQ session. Used to generate the ipython
|
||||
-- JSON config file.
|
||||
data ZeroMQEphemeralPorts =
|
||||
ZeroMQEphemeralPorts
|
||||
{ ephHbPort :: !Port
|
||||
, ephControlPort :: !Port
|
||||
, ephShellPort :: !Port
|
||||
, ephIOPubPort :: !Port
|
||||
, ephSignatureKey :: !ByteString
|
||||
}
|
||||
|
||||
instance ToJSON ZeroMQEphemeralPorts where
|
||||
toJSON ports =
|
||||
object [ "ip" .= ("127.0.0.1" :: String)
|
||||
, "transport" .= TCP
|
||||
, "control_port" .= ephControlPort ports
|
||||
, "hb_port" .= ephHbPort ports
|
||||
, "shell_port" .= ephShellPort ports
|
||||
, "iopub_port" .= ephIOPubPort ports
|
||||
, "key" .= Text.decodeUtf8 (ephSignatureKey ports)
|
||||
]
|
||||
object
|
||||
[ "ip" .= ("127.0.0.1" :: String)
|
||||
, "transport" .= TCP
|
||||
, "control_port" .= ephControlPort ports
|
||||
, "hb_port" .= ephHbPort ports
|
||||
, "shell_port" .= ephShellPort ports
|
||||
, "iopub_port" .= ephIOPubPort ports
|
||||
, "key" .= Text.decodeUtf8 (ephSignatureKey ports)
|
||||
]
|
||||
|
||||
parsePort :: String -> Maybe Int
|
||||
parsePort s = readMaybe num
|
||||
where num = reverse (takeWhile isNumber (reverse s))
|
||||
where
|
||||
num = reverse (takeWhile isNumber (reverse s))
|
||||
|
||||
bindLocalEphemeralPort :: Socket a -> IO Int
|
||||
bindLocalEphemeralPort socket = do
|
||||
bind socket $ "tcp://127.0.0.1:*"
|
||||
bind socket $ "tcp://127.0.0.1:*"
|
||||
endpointString <- lastEndpoint socket
|
||||
case parsePort endpointString of
|
||||
Nothing ->
|
||||
@ -135,44 +139,42 @@ bindLocalEphemeralPort socket = do
|
||||
Just endpointIndex ->
|
||||
return endpointIndex
|
||||
|
||||
-- | Run session for communicating with an IPython instance on ephemerally allocated
|
||||
-- ZMQ4 sockets. The sockets will be closed when the callback returns.
|
||||
-- | Run session for communicating with an IPython instance on ephemerally allocated ZMQ4 sockets.
|
||||
-- The sockets will be closed when the callback returns.
|
||||
withEphemeralPorts :: ByteString
|
||||
->
|
||||
-- ^ HMAC encryption key
|
||||
-> Bool
|
||||
-- ^ Print debug output
|
||||
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
|
||||
-- ^ Callback that takes the interface to the sockets.
|
||||
-> IO a
|
||||
Bool
|
||||
->
|
||||
-- ^ Print debug output
|
||||
(ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
|
||||
->
|
||||
-- ^ Callback that takes the
|
||||
-- interface to the sockets.
|
||||
IO a
|
||||
withEphemeralPorts key debug callback = do
|
||||
channels <- newZeroMQInterface key
|
||||
-- Create the ZMQ4 context
|
||||
withContext $ \context -> do
|
||||
-- Create the sockets to communicate with.
|
||||
withSocket context Rep $ \heartbeatSocket -> do
|
||||
withSocket context Router $ \controlportSocket -> do
|
||||
withSocket context Router $ \shellportSocket -> do
|
||||
withSocket context Pub $ \iopubSocket -> do
|
||||
|
||||
-- Bind each socket to a local port, getting the port chosen.
|
||||
hbPort <- bindLocalEphemeralPort heartbeatSocket
|
||||
controlPort <- bindLocalEphemeralPort controlportSocket
|
||||
shellPort <- bindLocalEphemeralPort shellportSocket
|
||||
iopubPort <- bindLocalEphemeralPort iopubSocket
|
||||
-- Create object to store ephemeral ports
|
||||
let ports = ZeroMQEphemeralPorts { ephHbPort = hbPort
|
||||
, ephControlPort = controlPort
|
||||
, ephShellPort = shellPort
|
||||
, ephIOPubPort = iopubPort
|
||||
, ephSignatureKey = key
|
||||
}
|
||||
-- Launch actions to listen to communicate between channels and cockets.
|
||||
_ <- forkIO $ forever $ heartbeat channels heartbeatSocket
|
||||
_ <- forkIO $ forever $ control debug channels controlportSocket
|
||||
_ <- forkIO $ forever $ shell debug channels shellportSocket
|
||||
_ <- forkIO $ checkedIOpub debug channels iopubSocket
|
||||
-- Run callback function; provide it with both ports and channels.
|
||||
callback ports channels
|
||||
-- Create the sockets to communicate with.
|
||||
withSocket context Rep $ \heartbeatSocket -> do
|
||||
withSocket context Router $ \controlportSocket -> do
|
||||
withSocket context Router $ \shellportSocket -> do
|
||||
withSocket context Pub $ \iopubSocket -> do
|
||||
-- Bind each socket to a local port, getting the port chosen.
|
||||
hbPort <- bindLocalEphemeralPort heartbeatSocket
|
||||
controlPort <- bindLocalEphemeralPort controlportSocket
|
||||
shellPort <- bindLocalEphemeralPort shellportSocket
|
||||
iopubPort <- bindLocalEphemeralPort iopubSocket
|
||||
-- Create object to store ephemeral ports
|
||||
let ports = ZeroMQEphemeralPorts { ephHbPort = hbPort, ephControlPort = controlPort, ephShellPort = shellPort, ephIOPubPort = iopubPort, ephSignatureKey = key }
|
||||
-- Launch actions to listen to communicate between channels and cockets.
|
||||
_ <- forkIO $ forever $ heartbeat channels heartbeatSocket
|
||||
_ <- forkIO $ forever $ control debug channels controlportSocket
|
||||
_ <- forkIO $ forever $ shell debug channels shellportSocket
|
||||
_ <- forkIO $ checkedIOpub debug channels iopubSocket
|
||||
-- Run callback function; provide it with both ports and channels.
|
||||
callback ports channels
|
||||
|
||||
serveStdin :: Profile -> IO ZeroMQStdin
|
||||
serveStdin profile = do
|
||||
@ -250,22 +252,18 @@ trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Messag
|
||||
trySendMessage nm debug hmacKey socket message = do
|
||||
let zmqErrorHandler :: ZMQError -> IO Bool
|
||||
zmqErrorHandler e
|
||||
-- Ignore errors if we cannot send.
|
||||
-- We may want to forward this to the thread that tried put the message
|
||||
-- in the Chan initially.
|
||||
-- Ignore errors if we cannot send. We may want to forward this to the thread that tried put the
|
||||
-- message in the Chan initially.
|
||||
| errno e == 38 = return False
|
||||
| otherwise = throwIO e
|
||||
(sendMessage debug hmacKey socket message >> return True)
|
||||
`catch` zmqErrorHandler
|
||||
| otherwise = throwIO e
|
||||
(sendMessage debug hmacKey socket message >> return True) `catch` zmqErrorHandler
|
||||
|
||||
|
||||
-- | Send messages via the iopub channel.
|
||||
-- This reads messages from the ZeroMQ iopub interface
|
||||
-- channel and then writes the messages to the socket.
|
||||
-- This is a checked implementation which will stop if the socket is closed.
|
||||
-- | Send messages via the iopub channel. This reads messages from the ZeroMQ iopub interface
|
||||
-- channel and then writes the messages to the socket. This is a checked implementation which will
|
||||
-- stop if the socket is closed.
|
||||
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
|
||||
checkedIOpub debug channels socket = do
|
||||
msg <- readChan (iopubChannel channels)
|
||||
msg <- readChan (iopubChannel channels)
|
||||
cont <- trySendMessage "io" debug (hmacKey channels) socket msg
|
||||
when cont $
|
||||
checkedIOpub debug channels socket
|
||||
|
Loading…
x
Reference in New Issue
Block a user