summaryrefslogtreecommitdiff
path: root/Com/Simatime/Que.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Com/Simatime/Que.hs')
-rw-r--r--Com/Simatime/Que.hs167
1 files changed, 0 insertions, 167 deletions
diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs
deleted file mode 100644
index 2b56575..0000000
--- a/Com/Simatime/Que.hs
+++ /dev/null
@@ -1,167 +0,0 @@
-{-# LANGUAGE NoImplicitPrelude #-}
-{-# LANGUAGE GeneralizedNewtypeDeriving #-}
-{-# LANGUAGE OverloadedStrings #-}
-
-{- | Interprocess communication
--}
-module Com.Simatime.Que
- ( main
- )
-where
-
-import Com.Simatime.Alpha hiding ( Text
- , get
- , gets
- , modify
- , poll
- )
-import qualified Com.Simatime.Go as Go
-import qualified Control.Concurrent.STM as STM
-import qualified Control.Exception as Exception
-import Control.Monad.Reader ( MonadTrans )
-import qualified Data.ByteString.Builder.Extra as Builder
-import qualified Data.ByteString.Lazy as BSL
-import Data.HashMap.Lazy ( HashMap )
-import qualified Data.HashMap.Lazy as HashMap
-import qualified Data.Text.Lazy as Text
-import Data.Text.Lazy ( Text
- , fromStrict
- )
-import qualified Data.Text.Encoding as Encoding
-import GHC.Base ( String )
-import qualified Network.Wai as Wai
-import qualified Network.Wai.Handler.Warp as Warp
-import Network.Wai.Middleware.RequestLogger
- ( logStdoutDev )
-import qualified Web.Scotty.Trans as Scotty
-
-main :: IO ()
-main = Exception.bracket startup shutdown run
- where
- run :: Wai.Application -> IO ()
- run waiapp = Warp.run 8081 waiapp
- -- | TODO: startup/shutdown ekg server, katip scribes
- startup :: IO Wai.Application
- startup = do
- sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
- let runActionToIO m = runReaderT (runApp m) sync
- Scotty.scottyAppT runActionToIO routes
- shutdown :: a -> IO a
- shutdown = pure . identity
-
-routes :: Scotty.ScottyT Text App ()
-routes = do
- Scotty.middleware logStdoutDev
-
- -- | Receive a value from a que. Blocks until a value is received,
- -- then returns. If 'poll=true', then stream data from the Que to the
- -- client.
- Scotty.get (Scotty.regex quepath) <| do
- (ns, qp) <- extract
- -- ensure namespace exists
- app . modify <| upsertNamespace ns
- q <- app <| que ns qp
- poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text
- case poll of
- "true" -> Scotty.stream $ streamQue q
- _ -> do
- r <- liftIO <| takeQue q
- Scotty.text <| fromStrict <| Encoding.decodeUtf8 r
-
- -- | Put a value on a que. Returns immediately.
- Scotty.post (Scotty.regex quepath) <| do
- (ns, qp) <- extract
- qdata <- Scotty.body
- -- ensure namespace exists
- app . modify <| upsertNamespace ns
- q <- app <| que ns qp
- liftIO <| pushQue (BSL.toStrict qdata) q
- return ()
-
--- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
-streamQue :: Que -> Wai.StreamingBody
-streamQue q write _ = Go.mult q >>= loop
- where
- loop c =
- Go.tap c
- >>= (write . Builder.byteStringInsert)
- >> (write <| Builder.byteStringInsert "\n")
- >> loop c
-
--- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
-grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
-grab = flip (HashMap.!)
-
--- | Inserts the namespace in 'AppState' if it doesn't exist.
-upsertNamespace :: Namespace -> AppState -> AppState
-upsertNamespace ns as = if HashMap.member ns (ques as)
- then as
- else as { ques = HashMap.insert ns mempty (ques as) }
-
--- | Inserts the que at the proper 'Namespace' and 'Quepath'.
-insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
-insertQue ns qp q as = as { ques = newQues }
- where
- newQues = HashMap.insert ns newQbase (ques as)
- newQbase = HashMap.insert qp q <| grab ns <| ques as
-
-quepath :: GHC.Base.String
-quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$"
-
-extract :: Scotty.ActionT Text App (Namespace, Quepath)
-extract = do
- ns <- Scotty.param "0"
- path <- Scotty.param "1"
- let p = Text.split (== '/') path |> filter (not . Text.null)
- return (ns, p)
-
-newtype App a = App
- { runApp :: ReaderT (STM.TVar AppState) IO a
- }
- deriving (Applicative, Functor, Monad, MonadIO, MonadReader
- (STM.TVar AppState))
-
-data AppState = AppState
- { ques :: HashMap Namespace Quebase
- }
-
--- | A synonym for 'lift' in order to be explicit about when we are
--- operating at the 'App' layer.
-app :: MonadTrans t => App a -> t App a
-app = lift
-
--- | Get something from the app state
-gets :: (AppState -> b) -> App b
-gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
-
--- | Apply a function to the app state
-modify :: (AppState -> AppState) -> App ()
-modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
-
-type Namespace = Text -- ^ housing for a set of que paths
-type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
-type Quepath = [Text] -- ^ any path can serve as an identifier for a que
-type Quedata = ByteString -- ^ any opaque data
-type Quebase = HashMap Quepath Que -- ^ a collection of ques
-
--- | Lookup or create a que
-que :: Namespace -> Quepath -> App Que
-que ns qp = do
- _ques <- gets ques
- let qbase = grab ns _ques
- queExists = HashMap.member qp qbase
- if queExists
- then return <| grab qp qbase
- else do
- c <- liftIO Go.chan
- modify (insertQue ns qp c)
- gets ques /> grab ns /> grab qp
-
--- | Put data on the que.
-pushQue :: Quedata -> Que -> IO ()
-pushQue = flip Go.write
-
--- | Tap and read from the Que. Tap first because a Que is actually a
--- broadcast channel. This allows for multiconsumer Ques.
-takeQue :: Que -> IO Quedata
-takeQue ch = Go.mult ch >>= Go.tap