summaryrefslogtreecommitdiff
path: root/Com/Simatime/Que.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2020-03-17 18:34:26 -0700
committerBen Sima <ben@bsima.me>2020-03-30 09:01:07 -0700
commit7ef86c61caa77a85abe24626848986fda2e0c666 (patch)
tree91b5ee484cb141c7705ee42c3619b4b48cfd8d59 /Com/Simatime/Que.hs
parent4fab0c52d2bae4076e79bf86ee4f3b8e08392a86 (diff)
Add que
Diffstat (limited to 'Com/Simatime/Que.hs')
-rw-r--r--Com/Simatime/Que.hs147
1 files changed, 147 insertions, 0 deletions
diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs
new file mode 100644
index 0000000..58a41f9
--- /dev/null
+++ b/Com/Simatime/Que.hs
@@ -0,0 +1,147 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+{- | Interprocess communication
+-}
+module Com.Simatime.Que
+ ( main
+ )
+where
+
+import Com.Simatime.Alpha hiding ( Text
+ , get
+ , gets
+ , modify
+ )
+import qualified Com.Simatime.Go as Go
+import qualified Control.Concurrent.STM as STM
+import qualified Control.Exception as Exception
+import GHC.Base ( String )
+import Control.Monad.Reader ( MonadTrans )
+import Data.HashMap.Lazy ( HashMap )
+import qualified Data.HashMap.Lazy as HashMap
+import Data.Text.Lazy ( Text )
+import qualified Data.Text.Lazy as Text
+import qualified Data.Text.Lazy.Encoding as Encoding
+import Network.Wai ( Application )
+import qualified Network.Wai.Handler.Warp as Warp
+import Network.Wai.Middleware.RequestLogger
+ ( logStdoutDev )
+import qualified Web.Scotty.Trans as Scotty
+
+main :: IO ()
+main = Exception.bracket startup shutdown run
+ where
+ run :: Application -> IO ()
+ run waiapp = Warp.run 8081 waiapp
+ -- | TODO: startup/shutdown ekg server, katip scribes
+ startup :: IO Application
+ startup = do
+ sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
+ let runActionToIO m = runReaderT (runApp m) sync
+ Scotty.scottyAppT runActionToIO routes
+ shutdown :: a -> IO a
+ shutdown = pure . identity
+
+routes :: Scotty.ScottyT Text App ()
+routes = do
+ Scotty.middleware logStdoutDev
+
+ -- | Receive a value from a que. Blocks until a value is received.
+ Scotty.get (Scotty.regex quepath) <| do
+ (ns, qp) <- extract
+ -- ensure namespace exists
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ r <- liftIO <| takeQue q
+ Scotty.text r
+
+ -- | Put a value on a que. Returns immediately.
+ Scotty.post (Scotty.regex quepath) <| do
+ (ns, qp) <- extract
+ qdata <- Scotty.body
+ -- ensure namespace exists
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ liftIO <| pushQue (Encoding.decodeUtf8 qdata) q
+ return ()
+
+-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
+grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
+grab = flip (HashMap.!)
+
+-- | Inserts the namespace in 'AppState' if it doesn't exist.
+upsertNamespace :: Namespace -> AppState -> AppState
+upsertNamespace ns as = if HashMap.member ns (ques as)
+ then as
+ else as { ques = HashMap.insert ns mempty (ques as) }
+
+insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
+insertQue ns qp q as = as { ques = newQues }
+ where
+ newQues = HashMap.insert ns newQbase (ques as)
+ newQbase = HashMap.insert qp q <| grab ns <| ques as
+
+quepath :: GHC.Base.String
+quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$"
+
+extract :: Scotty.ActionT Text App (Namespace, Quepath)
+extract = do
+ ns <- Scotty.param "0"
+ path <- Scotty.param "1"
+ return (ns, Text.split (== '/') path)
+
+newtype App a = App
+ { runApp :: ReaderT (STM.TVar AppState) IO a
+ }
+ deriving (Applicative, Functor, Monad, MonadIO, MonadReader
+ (STM.TVar AppState))
+
+data AppState = AppState
+ { ques :: HashMap Namespace Quebase
+ }
+
+-- | A synonym for 'lift' in order to be explicit about when we are
+-- operating at the 'App' layer.
+app :: MonadTrans t => App a -> t App a
+app = lift
+
+-- | Get something from the app state
+gets :: (AppState -> b) -> App b
+gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
+
+-- | Apply a function to the app state
+modify :: (AppState -> AppState) -> App ()
+modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
+
+-- * functionality
+
+type Namespace = Text -- ^ housing for a set of que paths
+--type Que = Go.Channel Quedata -- ^ a que is just a channel of json
+type Que = Go.Channel Quedata -- ^ a que is just a channel of json
+type Quepath = [Text] -- ^ any path can serve as an identifier for a que
+type Quedata = Text -- ^ any opaque data
+type Quebase = HashMap Quepath Que -- ^ a collection of ques
+
+-- | lookup or create a que
+que :: Namespace -> Quepath -> App Que
+que ns qp = do
+ _ques <- gets ques
+ let qbase = grab ns _ques
+ queExists = HashMap.member qp qbase
+ if queExists
+ then return <| grab qp qbase
+ else do
+ c <- liftIO Go.chan
+ modify (insertQue ns qp c)
+ gets ques /> grab ns /> grab qp
+
+-- | Put data on the que.
+pushQue :: Quedata -> Que -> IO ()
+pushQue = flip Go.write
+
+-- | Tap and read from the Que. Tap first because a Que is actually a
+-- broadcast channel. This allows for multiconsumer Ques.
+takeQue :: Que -> IO Quedata
+takeQue ch = Go.mult ch >>= Go.tap