From 0eca0c477b15e0412497ca21847bd969e5e73fc2 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Sun, 29 Mar 2020 20:19:29 -0700 Subject: Add polling and streaming to Que --- Com/Simatime/Que.hs | 56 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 18 deletions(-) (limited to 'Com') diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs index 58a41f9..2b56575 100644 --- a/Com/Simatime/Que.hs +++ b/Com/Simatime/Que.hs @@ -13,18 +13,23 @@ import Com.Simatime.Alpha hiding ( Text , get , gets , modify + , poll ) import qualified Com.Simatime.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception -import GHC.Base ( String ) import Control.Monad.Reader ( MonadTrans ) +import qualified Data.ByteString.Builder.Extra as Builder +import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy ( HashMap ) import qualified Data.HashMap.Lazy as HashMap -import Data.Text.Lazy ( Text ) import qualified Data.Text.Lazy as Text -import qualified Data.Text.Lazy.Encoding as Encoding -import Network.Wai ( Application ) +import Data.Text.Lazy ( Text + , fromStrict + ) +import qualified Data.Text.Encoding as Encoding +import GHC.Base ( String ) +import qualified Network.Wai as Wai import qualified Network.Wai.Handler.Warp as Warp import Network.Wai.Middleware.RequestLogger ( logStdoutDev ) @@ -33,10 +38,10 @@ import qualified Web.Scotty.Trans as Scotty main :: IO () main = Exception.bracket startup shutdown run where - run :: Application -> IO () + run :: Wai.Application -> IO () run waiapp = Warp.run 8081 waiapp -- | TODO: startup/shutdown ekg server, katip scribes - startup :: IO Application + startup :: IO Wai.Application startup = do sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } let runActionToIO m = runReaderT (runApp m) sync @@ -48,14 +53,20 @@ routes :: Scotty.ScottyT Text App () routes = do Scotty.middleware logStdoutDev - -- | Receive a value from a que. Blocks until a value is received. + -- | Receive a value from a que. Blocks until a value is received, + -- then returns. If 'poll=true', then stream data from the Que to the + -- client. Scotty.get (Scotty.regex quepath) <| do (ns, qp) <- extract -- ensure namespace exists app . modify <| upsertNamespace ns - q <- app <| que ns qp - r <- liftIO <| takeQue q - Scotty.text r + q <- app <| que ns qp + poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text + case poll of + "true" -> Scotty.stream $ streamQue q + _ -> do + r <- liftIO <| takeQue q + Scotty.text <| fromStrict <| Encoding.decodeUtf8 r -- | Put a value on a que. Returns immediately. Scotty.post (Scotty.regex quepath) <| do @@ -64,9 +75,19 @@ routes = do -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp - liftIO <| pushQue (Encoding.decodeUtf8 qdata) q + liftIO <| pushQue (BSL.toStrict qdata) q return () +-- | Forever write the data from 'Que' to 'Wai.StreamingBody'. +streamQue :: Que -> Wai.StreamingBody +streamQue q write _ = Go.mult q >>= loop + where + loop c = + Go.tap c + >>= (write . Builder.byteStringInsert) + >> (write <| Builder.byteStringInsert "\n") + >> loop c + -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) @@ -77,6 +98,7 @@ upsertNamespace ns as = if HashMap.member ns (ques as) then as else as { ques = HashMap.insert ns mempty (ques as) } +-- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState insertQue ns qp q as = as { ques = newQues } where @@ -90,7 +112,8 @@ extract :: Scotty.ActionT Text App (Namespace, Quepath) extract = do ns <- Scotty.param "0" path <- Scotty.param "1" - return (ns, Text.split (== '/') path) + let p = Text.split (== '/') path |> filter (not . Text.null) + return (ns, p) newtype App a = App { runApp :: ReaderT (STM.TVar AppState) IO a @@ -115,16 +138,13 @@ gets f = ask >>= liftIO . STM.readTVarIO >>= return . f modify :: (AppState -> AppState) -> App () modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f --- * functionality - type Namespace = Text -- ^ housing for a set of que paths ---type Que = Go.Channel Quedata -- ^ a que is just a channel of json -type Que = Go.Channel Quedata -- ^ a que is just a channel of json +type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes type Quepath = [Text] -- ^ any path can serve as an identifier for a que -type Quedata = Text -- ^ any opaque data +type Quedata = ByteString -- ^ any opaque data type Quebase = HashMap Quepath Que -- ^ a collection of ques --- | lookup or create a que +-- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do _ques <- gets ques -- cgit v1.2.3