Rename withDynamic to withEphemeralPorts and cleanup

This commit is contained in:
Joe Hendrix 2015-07-21 23:33:02 -07:00
parent 54f6e8a3ad
commit 5989188c6a
No known key found for this signature in database
GPG Key ID: 00F67DE32381DB9F

View File

@ -10,8 +10,8 @@ module IHaskell.IPython.ZeroMQ
, ZeroMQStdin(..)
, serveProfile
, serveStdin
, ZeroMQDynamicPorts(..)
, withDynamic
, ZeroMQEphemeralPorts(..)
, withEphemeralPorts
) where
import Control.Concurrent
@ -26,7 +26,8 @@ import Data.Digest.Pure.SHA as SHA
import Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text
import System.IO.Unsafe
import System.ZMQ4 hiding (stdin)
import System.ZMQ4 as ZMQ4 hiding (stdin)
import Text.Read (readMaybe)
import IHaskell.IPython.Types
import IHaskell.IPython.Message.Parser
@ -91,44 +92,50 @@ serveProfile profile debug = do
return channels
data ZeroMQDynamicPorts
= ZeroMQDynamicPorts { dynHbPort :: !Port
, dynControlPort :: !Port
, dynShellPort :: !Port
, dynIOPubPort :: !Port
, dynSignatureKey :: !ByteString
}
data ZeroMQEphemeralPorts
= ZeroMQEphemeralPorts { ephHbPort :: !Port
, ephControlPort :: !Port
, ephShellPort :: !Port
, ephIOPubPort :: !Port
, ephSignatureKey :: !ByteString
}
instance ToJSON ZeroMQDynamicPorts where
instance ToJSON ZeroMQEphemeralPorts where
toJSON ports =
object [ "ip" .= ("127.0.0.1" :: String)
, "transport" .= TCP
, "control_port" .= dynControlPort ports
, "hb_port" .= dynHbPort ports
, "shell_port" .= dynShellPort ports
, "iopub_port" .= dynIOPubPort ports
, "key" .= Text.decodeUtf8 (dynSignatureKey ports)
, "control_port" .= ephControlPort ports
, "hb_port" .= ephHbPort ports
, "shell_port" .= ephShellPort ports
, "iopub_port" .= ephIOPubPort ports
, "key" .= Text.decodeUtf8 (ephSignatureKey ports)
]
parsePort :: String -> Int
parsePort s = read num
parsePort :: String -> Maybe Int
parsePort s = readMaybe num
where num = reverse (takeWhile isNumber (reverse s))
bindLocalDynamicPort :: Socket a -> IO Int
bindLocalDynamicPort socket = do
bindLocalEphemeralPort :: Socket a -> IO Int
bindLocalEphemeralPort socket = do
bind socket $ "tcp://127.0.0.1:*"
parsePort <$> lastEndpoint socket
endpointString <- lastEndpoint socket
case parsePort endpointString of
Nothing ->
fail $ "internalError: IHaskell.IPython.ZeroMQ.bindLocalEphemeralPort encountered a port index that could not be interpreted as an int."
Just endpointIndex ->
return endpointIndex
-- | Start responding on all ZeroMQ channels used to communicate with IPython
-- | with dynamic ports.
-- Profide the callback with the dynamic ports chosen and a ZeroMQInterface.
withDynamic :: ByteString
-- ^ HMAC encryption key
-> Bool -- ^ Print debug output
-> (ZeroMQDynamicPorts -> ZeroMQInterface -> IO a)
-- ^ Callback that takes the interface to the sockets.
-> IO a
withDynamic key debug callback = do
-- with ephemerally allocated ports.
-- Profide the callback with the ports chosen and a ZeroMQInterface.
withEphemeralPorts :: ByteString
-- ^ HMAC encryption key
-> Bool
-- ^ Print debug output
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
-- ^ Callback that takes the interface to the sockets.
-> IO a
withEphemeralPorts key debug callback = do
-- Create all channels which will be used for higher level communication.
shellReqChan <- newChan
shellRepChan <- newChan
@ -137,31 +144,31 @@ withDynamic key debug callback = do
iopubChan <- newChan
let channels = Channels shellReqChan shellRepChan controlReqChan controlRepChan iopubChan key
-- Create the context in a separate thread that never finishes. If withContext or withSocket
-- complete, the context or socket become invalid.
-- Create the ZMQ4 context
withContext $ \context -> do
-- Serve on all sockets.
withSocket context Rep $ \heartbeat_socket -> do
withSocket context Router $ \controlport_socket -> do
withSocket context Router $ \shellport_socket -> do
withSocket context Pub $ \iopub_socket -> do
-- Create the sockets to communicate with.
withSocket context Rep $ \heartbeatSocket -> do
withSocket context Router $ \controlportSocket -> do
withSocket context Router $ \shellportSocket -> do
withSocket context Pub $ \iopubSocket -> do
dyn_hb_port <- bindLocalDynamicPort heartbeat_socket
dyn_control_port <- bindLocalDynamicPort controlport_socket
dyn_shell_port <- bindLocalDynamicPort shellport_socket
dyn_iopub_port <- bindLocalDynamicPort iopub_socket
-- Bind each socket to a local port, getting the port chosen.
hbPort <- bindLocalEphemeralPort heartbeatSocket
controlPort <- bindLocalEphemeralPort controlportSocket
shellPort <- bindLocalEphemeralPort shellportSocket
iopubPort <- bindLocalEphemeralPort iopubSocket
_ <- forkIO $ forever $ heartbeat channels heartbeat_socket
_ <- forkIO $ forever $ control debug channels controlport_socket
_ <- forkIO $ forever $ shell debug channels shellport_socket
_ <- forkIO $ forever $ checked_iopub debug channels iopub_socket
_ <- forkIO $ forever $ heartbeat channels heartbeatSocket
_ <- forkIO $ forever $ control debug channels controlportSocket
_ <- forkIO $ forever $ shell debug channels shellportSocket
_ <- forkIO $ forever $ checkedIOpub debug channels iopubSocket
let ports = ZeroMQDynamicPorts { dynHbPort = dyn_hb_port
, dynControlPort = dyn_control_port
, dynShellPort = dyn_shell_port
, dynIOPubPort = dyn_iopub_port
, dynSignatureKey = key
}
let ports = ZeroMQEphemeralPorts { ephHbPort = hbPort
, ephControlPort = controlPort
, ephShellPort = shellPort
, ephIOPubPort = iopubPort
, ephSignatureKey = key
}
callback ports channels
serveStdin :: Profile -> IO ZeroMQStdin
@ -237,17 +244,17 @@ iopub debug channels socket =
-- | Send messages via the iopub channel. | This reads messages from the ZeroMQ iopub interface
-- channel | and then writes the messages to the socket.
checked_iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checked_iopub debug channels socket = do
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub debug channels socket = do
msg <- readChan (iopubChannel channels)
let error_handler :: ZMQError -> IO ()
error_handler e
let errorHandler :: ZMQError -> IO ()
errorHandler e
-- Ignore errors if we cannot send.
-- We may want to cascade this back to channel.
| errno e == 38 = return ()
| otherwise = throwIO e
catch (sendMessage debug (hmacKey channels) socket msg)
error_handler
errorHandler
-- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message