mirror of
https://github.com/IHaskell/IHaskell.git
synced 2025-04-16 03:16:20 +00:00
Add buffers
field of the Wire Protocol
The Wire Protocol allows for "extra raw data buffer(s)" at the end of a ZeroMQ message. This commit enables buffers in the ipython-kernel. https://jupyter-client.readthedocs.io/en/stable/messaging.html#the-wire-protocol This field has been in "the Wire Protocol" since before the Jupyter Message specification version 5.0. https://github.com/jupyter/jupyter_client/issues/520 Resolves issue https://github.com/gibiansky/IHaskell/issues/1144 I've tested this feature with a proprietary JupyterLab extension and I've verified that it works. It's difficult to provide a public reproducible test. The best test may be to get ipywidgets Image working-- that uses buffers.
This commit is contained in:
parent
9dd237ea04
commit
8c43d47f09
@ -52,7 +52,8 @@ library
|
||||
transformers ,
|
||||
unordered-containers,
|
||||
uuid ,
|
||||
zeromq4-haskell
|
||||
zeromq4-haskell ,
|
||||
parsec
|
||||
|
||||
-- Example program
|
||||
executable simple-calc-example
|
||||
|
@ -125,7 +125,7 @@ createReplyHeader parent = do
|
||||
err = error $ "No reply for message " ++ show (mhMsgType parent)
|
||||
|
||||
return $ MessageHeader (mhIdentifiers parent) (Just parent) (Metadata (HashMap.fromList []))
|
||||
newMessageId (mhSessionId parent) (mhUsername parent) repType
|
||||
newMessageId (mhSessionId parent) (mhUsername parent) repType []
|
||||
|
||||
|
||||
-- | Execute an IPython kernel for a config. Your 'main' action should call this as the last thing
|
||||
|
@ -22,14 +22,16 @@ import IHaskell.IPython.Types
|
||||
type LByteString = Lazy.ByteString
|
||||
|
||||
-- --- External interface ----- | Parse a message from its ByteString components into a Message.
|
||||
-- See https://jupyter-client.readthedocs.io/en/stable/messaging.html#the-wire-protocol
|
||||
parseMessage :: [ByteString] -- ^ The list of identifiers sent with the message.
|
||||
-> ByteString -- ^ The header data.
|
||||
-> ByteString -- ^ The parent header, which is just "{}" if there is no header.
|
||||
-> ByteString -- ^ The metadata map, also "{}" for an empty map.
|
||||
-> ByteString -- ^ The message content.
|
||||
-> [ByteString] -- ^ Extra raw data buffer(s)
|
||||
-> Message -- ^ A parsed message.
|
||||
parseMessage idents headerData parentHeader metadata content =
|
||||
let header = parseHeader idents headerData parentHeader metadata
|
||||
parseMessage idents headerData parentHeader metadata content buffers =
|
||||
let header = parseHeader idents headerData parentHeader metadata buffers
|
||||
messageType = mhMsgType header
|
||||
messageWithoutHeader = parser messageType $ Lazy.fromStrict content
|
||||
in messageWithoutHeader { header = header }
|
||||
@ -39,16 +41,17 @@ parseHeader :: [ByteString] -- ^ The list of identifiers.
|
||||
-> ByteString -- ^ The header data.
|
||||
-> ByteString -- ^ The parent header, or "{}" for Nothing.
|
||||
-> ByteString -- ^ The metadata, or "{}" for an empty map.
|
||||
-> [ByteString] -- ^ Extra raw data buffer(s)
|
||||
-> MessageHeader -- The resulting message header.
|
||||
parseHeader idents headerData parentHeader metadata =
|
||||
MessageHeader idents parentResult metadataMap messageUUID sessionUUID username messageType
|
||||
parseHeader idents headerData parentHeader metadata buffers =
|
||||
MessageHeader idents parentResult metadataMap messageUUID sessionUUID username messageType buffers
|
||||
where
|
||||
-- Decode the header data and the parent header data into JSON objects. If the parent header data is
|
||||
-- absent, just have Nothing instead.
|
||||
Just result = decode $ Lazy.fromStrict headerData :: Maybe Object
|
||||
parentResult = if parentHeader == "{}"
|
||||
then Nothing
|
||||
else Just $ parseHeader idents parentHeader "{}" metadata
|
||||
else Just $ parseHeader idents parentHeader "{}" metadata []
|
||||
|
||||
Success (messageType, username, messageUUID, sessionUUID) = flip parse result $ \obj -> do
|
||||
messType <- obj .: "msg_type"
|
||||
|
@ -153,6 +153,7 @@ data MessageHeader =
|
||||
, mhSessionId :: UUID -- ^ A unique session UUID.
|
||||
, mhUsername :: Username -- ^ The user who sent this message.
|
||||
, mhMsgType :: MessageType -- ^ The message type.
|
||||
, mhBuffers :: [ByteString] -- ^ Extra raw data buffer(s)
|
||||
}
|
||||
deriving (Show, Read)
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
{-# LANGUAGE OverloadedStrings, DoAndIfThenElse #-}
|
||||
{-# LANGUAGE OverloadedStrings, DoAndIfThenElse, FlexibleContexts #-}
|
||||
|
||||
-- | Description : Low-level ZeroMQ communication wrapper.
|
||||
--
|
||||
@ -30,6 +30,7 @@ import Data.Monoid ((<>))
|
||||
import qualified Data.Text.Encoding as Text
|
||||
import System.ZMQ4 as ZMQ4
|
||||
import Text.Read (readMaybe)
|
||||
import Text.Parsec (runParserT, manyTill, anyToken, (<|>), eof, tokenPrim, incSourceColumn)
|
||||
|
||||
import IHaskell.IPython.Message.Parser
|
||||
import IHaskell.IPython.Types
|
||||
@ -268,38 +269,28 @@ checkedIOpub debug channels sock = do
|
||||
-- | Receive and parse a message from a socket.
|
||||
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
|
||||
receiveMessage debug sock = do
|
||||
-- Read all identifiers until the identifier/message delimiter.
|
||||
idents <- readUntil "<IDS|MSG>"
|
||||
|
||||
-- Ignore the signature for now.
|
||||
void next
|
||||
|
||||
headerData <- next
|
||||
parentHeader <- next
|
||||
metadata <- next
|
||||
content <- next
|
||||
|
||||
when debug $ do
|
||||
putStr "Header: "
|
||||
Char.putStrLn headerData
|
||||
putStr "Content: "
|
||||
Char.putStrLn content
|
||||
|
||||
return $ parseMessage idents headerData parentHeader metadata content
|
||||
|
||||
blobs <- receiveMulti sock
|
||||
runParserT parseBlobs () "" blobs >>= \r -> case r of
|
||||
Left parseerr -> fail $ "Malformed Wire Protocol message: " <> show parseerr
|
||||
Right (idents, headerData, parentHeader, metaData, content, buffers) -> do
|
||||
when debug $ do
|
||||
putStr "Header: "
|
||||
Char.putStrLn headerData
|
||||
putStr "Content: "
|
||||
Char.putStrLn content
|
||||
return $ parseMessage idents headerData parentHeader metaData content buffers
|
||||
where
|
||||
-- Receive the next piece of data from the socket.
|
||||
next = receive sock
|
||||
|
||||
-- Read data from the socket until we hit an ending string. Return all data as a list, which does
|
||||
-- not include the ending string.
|
||||
readUntil str = do
|
||||
line <- next
|
||||
if line /= str
|
||||
then do
|
||||
remaining <- readUntil str
|
||||
return $ line : remaining
|
||||
else return []
|
||||
parseBlobs = do
|
||||
idents <- manyTill anyToken (satisfy (=="<IDS|MSG>"))
|
||||
_ <- anyToken <|> fail "No signature"
|
||||
headerData <- anyToken <|> fail "No headerData"
|
||||
parentHeader <- anyToken <|> fail "No parentHeader"
|
||||
metaData <- anyToken <|> fail "No metaData"
|
||||
content <- anyToken <|> fail "No contents"
|
||||
buffers <- manyTill anyToken eof
|
||||
pure (idents, headerData, parentHeader, metaData, content, buffers)
|
||||
satisfy f = tokenPrim Char.unpack (\pos _ _ -> incSourceColumn pos 1)
|
||||
(\c -> if f c then Just c else Nothing)
|
||||
|
||||
-- | Encode a message in the IPython ZeroMQ communication protocol and send it through the provided
|
||||
-- socket. Sign it using HMAC with SHA-256 using the provided key.
|
||||
@ -320,10 +311,18 @@ sendMessage debug hmackey sock msg = do
|
||||
sendPiece parentHeaderStr
|
||||
sendPiece metadata
|
||||
|
||||
-- Conclude transmission with content.
|
||||
sendLast content
|
||||
-- If there are no mhBuffers, then conclude transmission with content.
|
||||
case mhBuffers hdr of
|
||||
[] -> sendLast content
|
||||
_ -> sendPiece content
|
||||
|
||||
sendBuffers $ mhBuffers hdr
|
||||
|
||||
where
|
||||
sendBuffers [] = pure ()
|
||||
sendBuffers [b] = sendLast b
|
||||
sendBuffers (b:bs) = sendPiece b >> sendBuffers bs
|
||||
|
||||
sendPiece = send sock [SendMore]
|
||||
sendLast = send sock []
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user