summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-03-29 20:19:29 -0700
committerBen Sima <ben@bsima.me>2020-03-30 09:01:07 -0700
commit0eca0c477b15e0412497ca21847bd969e5e73fc2 (patch)
tree6340c210dc8cd00a996fcf79ce69072a57fcbfd0
parent7ef86c61caa77a85abe24626848986fda2e0c666 (diff)
Add polling and streaming to Que
-rw-r--r--Com/Simatime/Que.hs56
1 files changed, 38 insertions, 18 deletions
diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs
index 58a41f9..2b56575 100644
--- a/Com/Simatime/Que.hs
+++ b/Com/Simatime/Que.hs
@@ -13,18 +13,23 @@ import Com.Simatime.Alpha hiding ( Text
, get
, gets
, modify
+ , poll
)
import qualified Com.Simatime.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
-import GHC.Base ( String )
import Control.Monad.Reader ( MonadTrans )
+import qualified Data.ByteString.Builder.Extra as Builder
+import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Lazy ( HashMap )
import qualified Data.HashMap.Lazy as HashMap
-import Data.Text.Lazy ( Text )
import qualified Data.Text.Lazy as Text
-import qualified Data.Text.Lazy.Encoding as Encoding
-import Network.Wai ( Application )
+import Data.Text.Lazy ( Text
+ , fromStrict
+ )
+import qualified Data.Text.Encoding as Encoding
+import GHC.Base ( String )
+import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import Network.Wai.Middleware.RequestLogger
( logStdoutDev )
@@ -33,10 +38,10 @@ import qualified Web.Scotty.Trans as Scotty
main :: IO ()
main = Exception.bracket startup shutdown run
where
- run :: Application -> IO ()
+ run :: Wai.Application -> IO ()
run waiapp = Warp.run 8081 waiapp
-- | TODO: startup/shutdown ekg server, katip scribes
- startup :: IO Application
+ startup :: IO Wai.Application
startup = do
sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
let runActionToIO m = runReaderT (runApp m) sync
@@ -48,14 +53,20 @@ routes :: Scotty.ScottyT Text App ()
routes = do
Scotty.middleware logStdoutDev
- -- | Receive a value from a que. Blocks until a value is received.
+ -- | Receive a value from a que. Blocks until a value is received,
+ -- then returns. If 'poll=true', then stream data from the Que to the
+ -- client.
Scotty.get (Scotty.regex quepath) <| do
(ns, qp) <- extract
-- ensure namespace exists
app . modify <| upsertNamespace ns
- q <- app <| que ns qp
- r <- liftIO <| takeQue q
- Scotty.text r
+ q <- app <| que ns qp
+ poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text
+ case poll of
+ "true" -> Scotty.stream $ streamQue q
+ _ -> do
+ r <- liftIO <| takeQue q
+ Scotty.text <| fromStrict <| Encoding.decodeUtf8 r
-- | Put a value on a que. Returns immediately.
Scotty.post (Scotty.regex quepath) <| do
@@ -64,9 +75,19 @@ routes = do
-- ensure namespace exists
app . modify <| upsertNamespace ns
q <- app <| que ns qp
- liftIO <| pushQue (Encoding.decodeUtf8 qdata) q
+ liftIO <| pushQue (BSL.toStrict qdata) q
return ()
+-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
+streamQue :: Que -> Wai.StreamingBody
+streamQue q write _ = Go.mult q >>= loop
+ where
+ loop c =
+ Go.tap c
+ >>= (write . Builder.byteStringInsert)
+ >> (write <| Builder.byteStringInsert "\n")
+ >> loop c
+
-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)
@@ -77,6 +98,7 @@ upsertNamespace ns as = if HashMap.member ns (ques as)
then as
else as { ques = HashMap.insert ns mempty (ques as) }
+-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
insertQue ns qp q as = as { ques = newQues }
where
@@ -90,7 +112,8 @@ extract :: Scotty.ActionT Text App (Namespace, Quepath)
extract = do
ns <- Scotty.param "0"
path <- Scotty.param "1"
- return (ns, Text.split (== '/') path)
+ let p = Text.split (== '/') path |> filter (not . Text.null)
+ return (ns, p)
newtype App a = App
{ runApp :: ReaderT (STM.TVar AppState) IO a
@@ -115,16 +138,13 @@ gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
modify :: (AppState -> AppState) -> App ()
modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
--- * functionality
-
type Namespace = Text -- ^ housing for a set of que paths
---type Que = Go.Channel Quedata -- ^ a que is just a channel of json
-type Que = Go.Channel Quedata -- ^ a que is just a channel of json
+type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
type Quepath = [Text] -- ^ any path can serve as an identifier for a que
-type Quedata = Text -- ^ any opaque data
+type Quedata = ByteString -- ^ any opaque data
type Quebase = HashMap Quepath Que -- ^ a collection of ques
--- | lookup or create a que
+-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
_ques <- gets ques