mirror of
https://github.com/IHaskell/IHaskell.git
synced 2025-04-16 11:26:08 +00:00
Fix thread exception that occurs in withEphemeralPorts.
This commit is contained in:
parent
4292912153
commit
19c664fa97
@ -25,7 +25,6 @@ import Data.Char
|
||||
import Data.Digest.Pure.SHA as SHA
|
||||
import Data.Monoid ((<>))
|
||||
import qualified Data.Text.Encoding as Text
|
||||
import System.IO.Unsafe
|
||||
import System.ZMQ4 as ZMQ4 hiding (stdin)
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
@ -136,12 +135,6 @@ bindLocalEphemeralPort socket = do
|
||||
Just endpointIndex ->
|
||||
return endpointIndex
|
||||
|
||||
-- | @withFork thread main@ runs @thread@ in a separate
|
||||
-- thread, and kills it when @main@ finishes.
|
||||
withFork :: IO () -> (ThreadId -> IO a) -> IO a
|
||||
withFork thread = bracket (forkIO thread) killThread
|
||||
|
||||
|
||||
-- | Run session for communicating with an IPython instance on ephemerally allocated
|
||||
-- ZMQ4 sockets. The sockets will be closed when the callback returns.
|
||||
withEphemeralPorts :: ByteString
|
||||
@ -174,12 +167,12 @@ withEphemeralPorts key debug callback = do
|
||||
, ephSignatureKey = key
|
||||
}
|
||||
-- Launch actions to listen to communicate between channels and cockets.
|
||||
withFork (forever $ heartbeat channels heartbeatSocket) $ \_ -> do
|
||||
withFork (forever $ control debug channels controlportSocket) $ \_ -> do
|
||||
withFork (forever $ shell debug channels shellportSocket) $ \_ -> do
|
||||
withFork (forever $ checkedIOpub debug channels iopubSocket) $ \_ -> do
|
||||
-- Run callback function; provide it with both ports and channels.
|
||||
callback ports channels
|
||||
_ <- forkIO $ forever $ heartbeat channels heartbeatSocket
|
||||
_ <- forkIO $ forever $ control debug channels controlportSocket
|
||||
_ <- forkIO $ forever $ shell debug channels shellportSocket
|
||||
_ <- forkIO $ checkedIOpub debug channels iopubSocket
|
||||
-- Run callback function; provide it with both ports and channels.
|
||||
callback ports channels
|
||||
|
||||
serveStdin :: Profile -> IO ZeroMQStdin
|
||||
serveStdin profile = do
|
||||
@ -252,19 +245,30 @@ iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
|
||||
iopub debug channels socket =
|
||||
readChan (iopubChannel channels) >>= sendMessage debug (hmacKey channels) socket
|
||||
|
||||
-- | Send messages via the iopub channel. | This reads messages from the ZeroMQ iopub interface
|
||||
-- channel | and then writes the messages to the socket.
|
||||
-- | Attempt to send a message along the socket, returning true if successful.
|
||||
trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Message -> IO Bool
|
||||
trySendMessage nm debug hmacKey socket message = do
|
||||
let zmqErrorHandler :: ZMQError -> IO Bool
|
||||
zmqErrorHandler e
|
||||
-- Ignore errors if we cannot send.
|
||||
-- We may want to forward this to the thread that tried put the message
|
||||
-- in the Chan initially.
|
||||
| errno e == 38 = return False
|
||||
| otherwise = throwIO e
|
||||
(sendMessage debug hmacKey socket message >> return True)
|
||||
`catch` zmqErrorHandler
|
||||
|
||||
|
||||
-- | Send messages via the iopub channel.
|
||||
-- This reads messages from the ZeroMQ iopub interface
|
||||
-- channel and then writes the messages to the socket.
|
||||
-- This is a checked implementation which will stop if the socket is closed.
|
||||
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
|
||||
checkedIOpub debug channels socket = do
|
||||
msg <- readChan (iopubChannel channels)
|
||||
let errorHandler :: ZMQError -> IO ()
|
||||
errorHandler e
|
||||
-- Ignore errors if we cannot send.
|
||||
-- We may want to cascade this back to channel.
|
||||
| errno e == 38 = return ()
|
||||
| otherwise = throwIO e
|
||||
catch (sendMessage debug (hmacKey channels) socket msg)
|
||||
errorHandler
|
||||
msg <- readChan (iopubChannel channels)
|
||||
cont <- trySendMessage "io" debug (hmacKey channels) socket msg
|
||||
when cont $
|
||||
checkedIOpub debug channels socket
|
||||
|
||||
-- | Receive and parse a message from a socket.
|
||||
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
|
||||
|
Loading…
x
Reference in New Issue
Block a user