diff options
author | Ben Sima <ben@bsima.me> | 2020-03-17 18:34:26 -0700 |
---|---|---|
committer | Ben Sima <ben@bsima.me> | 2020-03-30 09:01:07 -0700 |
commit | 7ef86c61caa77a85abe24626848986fda2e0c666 (patch) | |
tree | 91b5ee484cb141c7705ee42c3619b4b48cfd8d59 /Com/Simatime/Que.hs | |
parent | 4fab0c52d2bae4076e79bf86ee4f3b8e08392a86 (diff) |
Add que
Diffstat (limited to 'Com/Simatime/Que.hs')
-rw-r--r-- | Com/Simatime/Que.hs | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs new file mode 100644 index 0000000..58a41f9 --- /dev/null +++ b/Com/Simatime/Que.hs @@ -0,0 +1,147 @@ +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} + +{- | Interprocess communication +-} +module Com.Simatime.Que + ( main + ) +where + +import Com.Simatime.Alpha hiding ( Text + , get + , gets + , modify + ) +import qualified Com.Simatime.Go as Go +import qualified Control.Concurrent.STM as STM +import qualified Control.Exception as Exception +import GHC.Base ( String ) +import Control.Monad.Reader ( MonadTrans ) +import Data.HashMap.Lazy ( HashMap ) +import qualified Data.HashMap.Lazy as HashMap +import Data.Text.Lazy ( Text ) +import qualified Data.Text.Lazy as Text +import qualified Data.Text.Lazy.Encoding as Encoding +import Network.Wai ( Application ) +import qualified Network.Wai.Handler.Warp as Warp +import Network.Wai.Middleware.RequestLogger + ( logStdoutDev ) +import qualified Web.Scotty.Trans as Scotty + +main :: IO () +main = Exception.bracket startup shutdown run + where + run :: Application -> IO () + run waiapp = Warp.run 8081 waiapp + -- | TODO: startup/shutdown ekg server, katip scribes + startup :: IO Application + startup = do + sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } + let runActionToIO m = runReaderT (runApp m) sync + Scotty.scottyAppT runActionToIO routes + shutdown :: a -> IO a + shutdown = pure . identity + +routes :: Scotty.ScottyT Text App () +routes = do + Scotty.middleware logStdoutDev + + -- | Receive a value from a que. Blocks until a value is received. + Scotty.get (Scotty.regex quepath) <| do + (ns, qp) <- extract + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + r <- liftIO <| takeQue q + Scotty.text r + + -- | Put a value on a que. Returns immediately. + Scotty.post (Scotty.regex quepath) <| do + (ns, qp) <- extract + qdata <- Scotty.body + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + liftIO <| pushQue (Encoding.decodeUtf8 qdata) q + return () + +-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. +grab :: (Eq k, Hashable k) => k -> HashMap k v -> v +grab = flip (HashMap.!) + +-- | Inserts the namespace in 'AppState' if it doesn't exist. +upsertNamespace :: Namespace -> AppState -> AppState +upsertNamespace ns as = if HashMap.member ns (ques as) + then as + else as { ques = HashMap.insert ns mempty (ques as) } + +insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState +insertQue ns qp q as = as { ques = newQues } + where + newQues = HashMap.insert ns newQbase (ques as) + newQbase = HashMap.insert qp q <| grab ns <| ques as + +quepath :: GHC.Base.String +quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" + +extract :: Scotty.ActionT Text App (Namespace, Quepath) +extract = do + ns <- Scotty.param "0" + path <- Scotty.param "1" + return (ns, Text.split (== '/') path) + +newtype App a = App + { runApp :: ReaderT (STM.TVar AppState) IO a + } + deriving (Applicative, Functor, Monad, MonadIO, MonadReader + (STM.TVar AppState)) + +data AppState = AppState + { ques :: HashMap Namespace Quebase + } + +-- | A synonym for 'lift' in order to be explicit about when we are +-- operating at the 'App' layer. +app :: MonadTrans t => App a -> t App a +app = lift + +-- | Get something from the app state +gets :: (AppState -> b) -> App b +gets f = ask >>= liftIO . STM.readTVarIO >>= return . f + +-- | Apply a function to the app state +modify :: (AppState -> AppState) -> App () +modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f + +-- * functionality + +type Namespace = Text -- ^ housing for a set of que paths +--type Que = Go.Channel Quedata -- ^ a que is just a channel of json +type Que = Go.Channel Quedata -- ^ a que is just a channel of json +type Quepath = [Text] -- ^ any path can serve as an identifier for a que +type Quedata = Text -- ^ any opaque data +type Quebase = HashMap Quepath Que -- ^ a collection of ques + +-- | lookup or create a que +que :: Namespace -> Quepath -> App Que +que ns qp = do + _ques <- gets ques + let qbase = grab ns _ques + queExists = HashMap.member qp qbase + if queExists + then return <| grab qp qbase + else do + c <- liftIO Go.chan + modify (insertQue ns qp c) + gets ques /> grab ns /> grab qp + +-- | Put data on the que. +pushQue :: Quedata -> Que -> IO () +pushQue = flip Go.write + +-- | Tap and read from the Que. Tap first because a Que is actually a +-- broadcast channel. This allows for multiconsumer Ques. +takeQue :: Que -> IO Quedata +takeQue ch = Go.mult ch >>= Go.tap |