stdin works

This commit is contained in:
Andrew Gibiansky 2014-01-05 23:01:38 -05:00
parent 70f414f960
commit c7b11432b4
10 changed files with 210 additions and 40 deletions

View File

@ -7,7 +7,7 @@ name: ihaskell
-- PVP summary: +-+------- breaking API changes
-- | | +----- non-breaking API additions
-- | | | +--- code changes with no API change
version: 0.2.0.5
version: 0.2.0.6
-- A short (one-line) description of the package.
synopsis: A Haskell backend kernel for the IPython project.
@ -76,10 +76,20 @@ library
cereal ==0.3.*,
text >=0.11,
mtl >= 2.1
exposed-modules: IHaskell.Display,
Paths_ihaskell,
IHaskell.Types,
exposed-modules: IHaskell.Display
IHaskell.Eval.Completion
IHaskell.Eval.Evaluate
IHaskell.Eval.Info
IHaskell.Eval.Lint
IHaskell.Eval.Parser
IHaskell.Eval.Stdin
IHaskell.IPython
IHaskell.Message.Parser
IHaskell.Message.UUID
IHaskell.Message.Writer
IHaskell.Types
IHaskell.ZeroMQ
Paths_ihaskell
executable IHaskell
-- .hs or .lhs file containing the Main module.
@ -95,6 +105,7 @@ executable IHaskell
IHaskell.Eval.Info
IHaskell.Eval.Evaluate
IHaskell.Eval.Parser
IHaskell.Eval.Stdin
IHaskell.IPython
IHaskell.Message.Parser
IHaskell.Message.UUID

Binary file not shown.

View File

@ -83,6 +83,7 @@ type Interpreter = Ghc
globalImports :: [String]
globalImports =
[ "import IHaskell.Display"
, "import qualified IHaskell.Eval.Stdin"
, "import Control.Applicative ((<$>))"
, "import GHC.IO.Handle (hDuplicateTo, hDuplicate, hClose)"
, "import System.Posix.IO"
@ -103,7 +104,7 @@ interpret action = runGhc (Just libdir) $ do
-- Close stdin so it can't be used.
-- Otherwise it'll block the kernel forever.
runStmt "System.IO.hClose System.IO.stdin" RunToCompletion
runStmt "IHaskell.Eval.Stdin.fixStdin" RunToCompletion
initializeItVariable

View File

@ -0,0 +1,97 @@
{-# LANGUAGE NoImplicitPrelude, OverloadedStrings, DoAndIfThenElse #-}
module IHaskell.Eval.Stdin (fixStdin, recordParentHeader, recordKernelProfile) where
import ClassyPrelude hiding (hPutStrLn, readFile, writeFile)
import Prelude (read)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan
import Control.Monad
import GHC.IO.Handle
import GHC.IO.Handle.Types
import System.IO
import System.Posix.IO
import System.IO.Unsafe
import qualified Data.Map as Map
import IHaskell.Types
import IHaskell.IPython
import IHaskell.ZeroMQ
import IHaskell.Message.UUID as UUID
stdinInterface :: MVar ZeroMQStdin
stdinInterface = unsafePerformIO newEmptyMVar
-- | Manipulate standard input so that it is sourced from the IPython
-- frontend. This function is build on layers of deep magical hackery, so
-- be careful modifying it.
fixStdin :: IO ()
fixStdin = do
-- Initialize the stdin interface.
dir <- getIHaskellDir
profile <- read <$> readFile (dir ++ "/.kernel-profile")
interface <- serveStdin profile
putMVar stdinInterface interface
void $ forkIO stdinOnce
stdinOnce :: IO ()
stdinOnce = do
-- Create a pipe using and turn it into handles.
(readEnd, writeEnd) <- createPipe
newStdin <- fdToHandle readEnd
stdinInput <- fdToHandle writeEnd
hSetBuffering newStdin NoBuffering
hSetBuffering stdinInput NoBuffering
-- Store old stdin and swap in new stdin.
oldStdin <- hDuplicate stdin
hDuplicateTo newStdin stdin
loop stdinInput oldStdin newStdin
where
loop stdinInput oldStdin newStdin = do
let FileHandle _ mvar = stdin
threadDelay $ 150 * 1000
empty <- isEmptyMVar mvar
if not empty
then loop stdinInput oldStdin newStdin
else do
line <- getInputLine
hPutStr stdinInput $ line ++ "\n"
loop stdinInput oldStdin newStdin
-- | Get a line of input from the IPython frontend.
getInputLine :: IO String
getInputLine = do
StdinChannel req rep <- readMVar stdinInterface
-- Send a request for input.
uuid <- UUID.random
dir <- getIHaskellDir
parentHeader <- read <$> readFile (dir ++ "/.last-req-header")
let header = MessageHeader {
username = username parentHeader,
identifiers = identifiers parentHeader,
parentHeader = Just parentHeader,
messageId = uuid,
sessionId = sessionId parentHeader,
metadata = Map.fromList [],
msgType = InputRequestMessage
}
let msg = RequestInput header ""
writeChan req msg
-- Get the reply.
InputReply _ value <- readChan rep
hPrint stderr value
return value
recordParentHeader :: MessageHeader -> IO ()
recordParentHeader header = do
dir <- getIHaskellDir
writeFile (dir ++ "/.last-req-header") $ show header
recordKernelProfile :: Profile -> IO ()
recordKernelProfile profile = do
dir <- getIHaskellDir
writeFile (dir ++ "/.kernel-profile") $ show profile

View File

@ -77,6 +77,7 @@ parser ExecuteRequestMessage = executeRequestParser
parser CompleteRequestMessage = completeRequestParser
parser ObjectInfoRequestMessage = objectInfoRequestParser
parser ShutdownRequestMessage = shutdownRequestParser
parser InputReplyMessage = inputReplyParser
parser other = error $ "Unknown message type " ++ show other
-- | Parse a kernel info request.
@ -141,3 +142,12 @@ shutdownRequestParser content = parsed
return $ ShutdownRequest noHeader code
Just decoded = decode content
inputReplyParser :: LByteString -> Message
inputReplyParser content = parsed
where
Success parsed = flip parse decoded $ \ obj -> do
value <- obj .: "value"
return $ InputReply noHeader value
Just decoded = decode content

View File

@ -12,6 +12,10 @@ import Control.Monad (mzero)
import Data.Aeson
import Data.UUID.V4 (nextRandom)
import Text.Read as Read hiding (pfail, String)
import Text.ParserCombinators.ReadP
-- We use an internal string representation because for the purposes of
-- IPython, it matters whether the letters are uppercase or lowercase and
-- whether the dashes are present in the correct locations. For the
@ -20,10 +24,7 @@ import Data.UUID.V4 (nextRandom)
-- them.
-- | A UUID (universally unique identifier).
data UUID = UUID String deriving Eq
instance Show UUID where
show (UUID s) = s
data UUID = UUID String deriving (Show, Read, Eq)
-- | Generate a list of random UUIDs.
randoms :: Int -- ^ Number of UUIDs to generate.

View File

@ -80,6 +80,10 @@ instance ToJSON Message where
"wait" .= wait
]
toJSON RequestInput{inputPrompt = prompt} = object [
"prompt" .= prompt
]
toJSON body = error $ "Do not know how to convert to JSON for message " ++ show body

View File

@ -7,7 +7,7 @@ module IHaskell.Types (
MessageHeader (..),
MessageType(..),
Username,
Metadata,
Metadata(..),
Port,
replyType,
ExecutionState (..),
@ -43,7 +43,7 @@ data Profile = Profile {
shellPort :: Port, -- ^ The shell command port.
iopubPort :: Port, -- ^ The Iopub port.
key :: ByteString -- ^ The HMAC encryption key.
} deriving Show
} deriving (Show, Read)
-- Convert the kernel profile to and from JSON.
instance FromJSON Profile where
@ -112,7 +112,7 @@ data MessageHeader = MessageHeader {
sessionId :: UUID, -- ^ A unique session UUID.
username :: Username, -- ^ The user who sent this message.
msgType :: MessageType -- ^ The message type.
} deriving Show
} deriving (Show, Read)
-- Convert a message header into the JSON field for the header.
-- This field does not actually have all the record fields.
@ -121,7 +121,7 @@ instance ToJSON MessageHeader where
"msg_id" .= messageId header,
"session" .= sessionId header,
"username" .= username header,
"msg_type" .= show (msgType header)
"msg_type" .= showMessageType (msgType header)
]
-- | A username for the source of a message.
@ -147,24 +147,29 @@ data MessageType = KernelInfoReplyMessage
| ShutdownRequestMessage
| ShutdownReplyMessage
| ClearOutputMessage
| InputRequestMessage
| InputReplyMessage
deriving (Show, Read)
instance Show MessageType where
show KernelInfoReplyMessage = "kernel_info_reply"
show KernelInfoRequestMessage = "kernel_info_request"
show ExecuteReplyMessage = "execute_reply"
show ExecuteRequestMessage = "execute_request"
show StatusMessage = "status"
show StreamMessage = "stream"
show DisplayDataMessage = "display_data"
show OutputMessage = "pyout"
show InputMessage = "pyin"
show CompleteRequestMessage = "complete_request"
show CompleteReplyMessage = "complete_reply"
show ObjectInfoRequestMessage = "object_info_request"
show ObjectInfoReplyMessage = "object_info_reply"
show ShutdownRequestMessage = "shutdown_request"
show ShutdownReplyMessage = "shutdown_reply"
show ClearOutputMessage = "clear_output"
showMessageType :: MessageType -> String
showMessageType KernelInfoReplyMessage = "kernel_info_reply"
showMessageType KernelInfoRequestMessage = "kernel_info_request"
showMessageType ExecuteReplyMessage = "execute_reply"
showMessageType ExecuteRequestMessage = "execute_request"
showMessageType StatusMessage = "status"
showMessageType StreamMessage = "stream"
showMessageType DisplayDataMessage = "display_data"
showMessageType OutputMessage = "pyout"
showMessageType InputMessage = "pyin"
showMessageType CompleteRequestMessage = "complete_request"
showMessageType CompleteReplyMessage = "complete_reply"
showMessageType ObjectInfoRequestMessage = "object_info_request"
showMessageType ObjectInfoReplyMessage = "object_info_reply"
showMessageType ShutdownRequestMessage = "shutdown_request"
showMessageType ShutdownReplyMessage = "shutdown_reply"
showMessageType ClearOutputMessage = "clear_output"
showMessageType InputRequestMessage = "input_request"
showMessageType InputReplyMessage = "input_reply"
instance FromJSON MessageType where
parseJSON (String s) = case s of
@ -184,6 +189,8 @@ instance FromJSON MessageType where
"shutdown_request" -> return ShutdownRequestMessage
"shutdown_reply" -> return ShutdownReplyMessage
"clear_output" -> return ClearOutputMessage
"input_request" -> return InputRequestMessage
"input_reply" -> return InputReplyMessage
_ -> fail ("Unknown message type: " ++ show s)
parseJSON _ = fail "Must be a string."
@ -294,6 +301,16 @@ data Message
wait :: Bool -- ^ Whether to wait to redraw until there is more output.
}
| RequestInput {
header :: MessageHeader,
inputPrompt :: String
}
| InputReply {
header :: MessageHeader,
inputValue :: String
}
deriving Show
-- | Possible statuses in the execution reply messages.

View File

@ -1,4 +1,4 @@
{-# LANGUAGE NoImplicitPrelude, OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude, OverloadedStrings, DoAndIfThenElse #-}
-- | Description : Low-level ZeroMQ communication wrapper.
--
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython,
@ -6,7 +6,9 @@
-- takes a IPython profile specification and returns the channel interface to use.
module IHaskell.ZeroMQ (
ZeroMQInterface (..),
serveProfile
ZeroMQStdin(..),
serveProfile,
serveStdin,
) where
import ClassyPrelude hiding (stdin)
@ -20,6 +22,8 @@ import IHaskell.Types
import IHaskell.Message.Parser
import IHaskell.Message.Writer
import System.IO.Unsafe
-- | The channel interface to the ZeroMQ sockets. All communication is done via
-- Messages, which are encoded and decoded into a lower level form before being
-- transmitted to IPython. These channels should functionally serve as
@ -34,6 +38,11 @@ data ZeroMQInterface = Channels {
iopubChannel :: Chan Message -- ^ Writing to this channel sends an iopub message to the frontend.
}
data ZeroMQStdin = StdinChannel {
stdinRequestChannel :: Chan Message,
stdinReplyChannel :: Chan Message
}
-- | Start responding on all ZeroMQ channels used to communicate with IPython
-- | via the provided profile. Return a set of channels which can be used to
-- | communicate with IPython in a more structured manner.
@ -55,7 +64,6 @@ serveProfile profile = do
forkIO $ serveSocket context Rep (hbPort profile) $ heartbeat channels
forkIO $ serveSocket context Router (controlPort profile) $ control channels
forkIO $ serveSocket context Router (shellPort profile) $ shell channels
forkIO $ serveSocket context Router (stdinPort profile) $ stdin channels
-- The context is reference counted in this thread only. Thus, the last
-- serveSocket cannot be asynchronous, because otherwise context would
@ -65,6 +73,24 @@ serveProfile profile = do
return channels
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin profile = do
reqChannel <- newChan
repChannel <- newChan
-- Create the context in a separate thread that never finishes. If
-- withContext or withSocket complete, the context or socket become invalid.
forkIO $ withContext $ \context ->
-- Serve on all sockets.
serveSocket context Router (stdinPort profile) $ \socket -> do
-- Read the request from the interface channel and send it.
readChan reqChannel >>= sendMessage socket
-- Receive a response and write it to the interface channel.
receiveMessage socket >>= writeChan repChannel
return $ StdinChannel reqChannel repChannel
-- | Serve on a given socket in a separate thread. Bind the socket in the
-- | given context and then loop the provided action, which should listen
-- | on the socket and respond to any events.
@ -120,11 +146,6 @@ iopub :: ZeroMQInterface -> Socket Pub -> IO ()
iopub channels socket =
readChan (iopubChannel channels) >>= sendMessage socket
stdin :: ZeroMQInterface -> Socket Router -> IO ()
stdin _ socket = do
void $ receive socket
return ()
-- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Socket a -> IO Message
receiveMessage socket = do

View File

@ -24,6 +24,7 @@ import IHaskell.Eval.Completion (complete)
import IHaskell.Eval.Info
import qualified Data.ByteString.Char8 as Chars
import IHaskell.IPython
import qualified IHaskell.Eval.Stdin as Stdin
import GHC hiding (extensions)
import Outputable (showSDoc, ppr)
@ -218,6 +219,9 @@ runKernel profileSrc initInfo = do
-- Parse the profile file.
Just profile <- liftM decode . readFile . fpFromText $ pack profileSrc
-- Necessary for `getLine` and their ilk to work.
Stdin.recordKernelProfile profile
-- Serve on all sockets and ports defined in the profile.
interface <- serveProfile profile
@ -289,7 +293,8 @@ replyTo :: ZeroMQInterface -> Message -> MessageHeader -> KernelState -> Interpr
-- Reply to kernel info requests with a kernel info reply. No computation
-- needs to be done, as a kernel info reply is a static object (all info is
-- hard coded into the representation of that message type).
replyTo _ KernelInfoRequest{} replyHeader state = return (state, KernelInfoReply { header = replyHeader })
replyTo _ KernelInfoRequest{} replyHeader state =
return (state, KernelInfoReply { header = replyHeader })
-- Reply to a shutdown request by exiting the main thread.
-- Before shutdown, reply to the request to let the frontend know shutdown
@ -301,10 +306,13 @@ replyTo interface ShutdownRequest{restartPending = restartPending} replyHeader _
-- Reply to an execution request. The reply itself does not require
-- computation, but this causes messages to be sent to the IOPub socket
-- with the output of the code in the execution request.
replyTo interface ExecuteRequest{ getCode = code } replyHeader state = do
replyTo interface req@ExecuteRequest{ getCode = code } replyHeader state = do
-- Convenience function to send a message to the IOPub socket.
let send msg = liftIO $ writeChan (iopubChannel interface) msg
-- Log things so that we can use stdin.
liftIO $ Stdin.recordParentHeader $ header req
-- Notify the frontend that the kernel is busy computing.
-- All the headers are copies of the reply header with a different
-- message type, because this preserves the session ID, parent header,