From 8a95a5838b992f51611d89971cf6ad8cabe68970 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Mon, 30 Mar 2020 07:28:42 -0700 Subject: Rename Com.Simatime.Que to Run.Que Now that I have the domain name que.run! Aw yeah. --- Com/Simatime/Que.hs | 167 ---------------------------------------------------- Run/Que.hs | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++ default.nix | 31 +++++----- 3 files changed, 182 insertions(+), 183 deletions(-) delete mode 100644 Com/Simatime/Que.hs create mode 100644 Run/Que.hs diff --git a/Com/Simatime/Que.hs b/Com/Simatime/Que.hs deleted file mode 100644 index 2b56575..0000000 --- a/Com/Simatime/Que.hs +++ /dev/null @@ -1,167 +0,0 @@ -{-# LANGUAGE NoImplicitPrelude #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE OverloadedStrings #-} - -{- | Interprocess communication --} -module Com.Simatime.Que - ( main - ) -where - -import Com.Simatime.Alpha hiding ( Text - , get - , gets - , modify - , poll - ) -import qualified Com.Simatime.Go as Go -import qualified Control.Concurrent.STM as STM -import qualified Control.Exception as Exception -import Control.Monad.Reader ( MonadTrans ) -import qualified Data.ByteString.Builder.Extra as Builder -import qualified Data.ByteString.Lazy as BSL -import Data.HashMap.Lazy ( HashMap ) -import qualified Data.HashMap.Lazy as HashMap -import qualified Data.Text.Lazy as Text -import Data.Text.Lazy ( Text - , fromStrict - ) -import qualified Data.Text.Encoding as Encoding -import GHC.Base ( String ) -import qualified Network.Wai as Wai -import qualified Network.Wai.Handler.Warp as Warp -import Network.Wai.Middleware.RequestLogger - ( logStdoutDev ) -import qualified Web.Scotty.Trans as Scotty - -main :: IO () -main = Exception.bracket startup shutdown run - where - run :: Wai.Application -> IO () - run waiapp = Warp.run 8081 waiapp - -- | TODO: startup/shutdown ekg server, katip scribes - startup :: IO Wai.Application - startup = do - sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } - let runActionToIO m = runReaderT (runApp m) sync - Scotty.scottyAppT runActionToIO routes - shutdown :: a -> IO a - shutdown = pure . identity - -routes :: Scotty.ScottyT Text App () -routes = do - Scotty.middleware logStdoutDev - - -- | Receive a value from a que. Blocks until a value is received, - -- then returns. If 'poll=true', then stream data from the Que to the - -- client. - Scotty.get (Scotty.regex quepath) <| do - (ns, qp) <- extract - -- ensure namespace exists - app . modify <| upsertNamespace ns - q <- app <| que ns qp - poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text - case poll of - "true" -> Scotty.stream $ streamQue q - _ -> do - r <- liftIO <| takeQue q - Scotty.text <| fromStrict <| Encoding.decodeUtf8 r - - -- | Put a value on a que. Returns immediately. - Scotty.post (Scotty.regex quepath) <| do - (ns, qp) <- extract - qdata <- Scotty.body - -- ensure namespace exists - app . modify <| upsertNamespace ns - q <- app <| que ns qp - liftIO <| pushQue (BSL.toStrict qdata) q - return () - --- | Forever write the data from 'Que' to 'Wai.StreamingBody'. -streamQue :: Que -> Wai.StreamingBody -streamQue q write _ = Go.mult q >>= loop - where - loop c = - Go.tap c - >>= (write . Builder.byteStringInsert) - >> (write <| Builder.byteStringInsert "\n") - >> loop c - --- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. -grab :: (Eq k, Hashable k) => k -> HashMap k v -> v -grab = flip (HashMap.!) - --- | Inserts the namespace in 'AppState' if it doesn't exist. -upsertNamespace :: Namespace -> AppState -> AppState -upsertNamespace ns as = if HashMap.member ns (ques as) - then as - else as { ques = HashMap.insert ns mempty (ques as) } - --- | Inserts the que at the proper 'Namespace' and 'Quepath'. -insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState -insertQue ns qp q as = as { ques = newQues } - where - newQues = HashMap.insert ns newQbase (ques as) - newQbase = HashMap.insert qp q <| grab ns <| ques as - -quepath :: GHC.Base.String -quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" - -extract :: Scotty.ActionT Text App (Namespace, Quepath) -extract = do - ns <- Scotty.param "0" - path <- Scotty.param "1" - let p = Text.split (== '/') path |> filter (not . Text.null) - return (ns, p) - -newtype App a = App - { runApp :: ReaderT (STM.TVar AppState) IO a - } - deriving (Applicative, Functor, Monad, MonadIO, MonadReader - (STM.TVar AppState)) - -data AppState = AppState - { ques :: HashMap Namespace Quebase - } - --- | A synonym for 'lift' in order to be explicit about when we are --- operating at the 'App' layer. -app :: MonadTrans t => App a -> t App a -app = lift - --- | Get something from the app state -gets :: (AppState -> b) -> App b -gets f = ask >>= liftIO . STM.readTVarIO >>= return . f - --- | Apply a function to the app state -modify :: (AppState -> AppState) -> App () -modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f - -type Namespace = Text -- ^ housing for a set of que paths -type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes -type Quepath = [Text] -- ^ any path can serve as an identifier for a que -type Quedata = ByteString -- ^ any opaque data -type Quebase = HashMap Quepath Que -- ^ a collection of ques - --- | Lookup or create a que -que :: Namespace -> Quepath -> App Que -que ns qp = do - _ques <- gets ques - let qbase = grab ns _ques - queExists = HashMap.member qp qbase - if queExists - then return <| grab qp qbase - else do - c <- liftIO Go.chan - modify (insertQue ns qp c) - gets ques /> grab ns /> grab qp - --- | Put data on the que. -pushQue :: Quedata -> Que -> IO () -pushQue = flip Go.write - --- | Tap and read from the Que. Tap first because a Que is actually a --- broadcast channel. This allows for multiconsumer Ques. -takeQue :: Que -> IO Quedata -takeQue ch = Go.mult ch >>= Go.tap diff --git a/Run/Que.hs b/Run/Que.hs new file mode 100644 index 0000000..f1d0e28 --- /dev/null +++ b/Run/Que.hs @@ -0,0 +1,167 @@ +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} + +{- | Interprocess communication +-} +module Run.Que + ( main + ) +where + +import Com.Simatime.Alpha hiding ( Text + , get + , gets + , modify + , poll + ) +import qualified Com.Simatime.Go as Go +import qualified Control.Concurrent.STM as STM +import qualified Control.Exception as Exception +import Control.Monad.Reader ( MonadTrans ) +import qualified Data.ByteString.Builder.Extra as Builder +import qualified Data.ByteString.Lazy as BSL +import Data.HashMap.Lazy ( HashMap ) +import qualified Data.HashMap.Lazy as HashMap +import qualified Data.Text.Lazy as Text +import Data.Text.Lazy ( Text + , fromStrict + ) +import qualified Data.Text.Encoding as Encoding +import GHC.Base ( String ) +import qualified Network.Wai as Wai +import qualified Network.Wai.Handler.Warp as Warp +import Network.Wai.Middleware.RequestLogger + ( logStdoutDev ) +import qualified Web.Scotty.Trans as Scotty + +main :: IO () +main = Exception.bracket startup shutdown run + where + run :: Wai.Application -> IO () + run waiapp = Warp.run 8081 waiapp + -- | TODO: startup/shutdown ekg server, katip scribes + startup :: IO Wai.Application + startup = do + sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } + let runActionToIO m = runReaderT (runApp m) sync + Scotty.scottyAppT runActionToIO routes + shutdown :: a -> IO a + shutdown = pure . identity + +routes :: Scotty.ScottyT Text App () +routes = do + Scotty.middleware logStdoutDev + + -- | Receive a value from a que. Blocks until a value is received, + -- then returns. If 'poll=true', then stream data from the Que to the + -- client. + Scotty.get (Scotty.regex quepath) <| do + (ns, qp) <- extract + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text + case poll of + "true" -> Scotty.stream $ streamQue q + _ -> do + r <- liftIO <| takeQue q + Scotty.text <| fromStrict <| Encoding.decodeUtf8 r + + -- | Put a value on a que. Returns immediately. + Scotty.post (Scotty.regex quepath) <| do + (ns, qp) <- extract + qdata <- Scotty.body + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + liftIO <| pushQue (BSL.toStrict qdata) q + return () + +-- | Forever write the data from 'Que' to 'Wai.StreamingBody'. +streamQue :: Que -> Wai.StreamingBody +streamQue q write _ = Go.mult q >>= loop + where + loop c = + Go.tap c + >>= (write . Builder.byteStringInsert) + >> (write <| Builder.byteStringInsert "\n") + >> loop c + +-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. +grab :: (Eq k, Hashable k) => k -> HashMap k v -> v +grab = flip (HashMap.!) + +-- | Inserts the namespace in 'AppState' if it doesn't exist. +upsertNamespace :: Namespace -> AppState -> AppState +upsertNamespace ns as = if HashMap.member ns (ques as) + then as + else as { ques = HashMap.insert ns mempty (ques as) } + +-- | Inserts the que at the proper 'Namespace' and 'Quepath'. +insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState +insertQue ns qp q as = as { ques = newQues } + where + newQues = HashMap.insert ns newQbase (ques as) + newQbase = HashMap.insert qp q <| grab ns <| ques as + +quepath :: GHC.Base.String +quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" + +extract :: Scotty.ActionT Text App (Namespace, Quepath) +extract = do + ns <- Scotty.param "0" + path <- Scotty.param "1" + let p = Text.split (== '/') path |> filter (not . Text.null) + return (ns, p) + +newtype App a = App + { runApp :: ReaderT (STM.TVar AppState) IO a + } + deriving (Applicative, Functor, Monad, MonadIO, MonadReader + (STM.TVar AppState)) + +data AppState = AppState + { ques :: HashMap Namespace Quebase + } + +-- | A synonym for 'lift' in order to be explicit about when we are +-- operating at the 'App' layer. +app :: MonadTrans t => App a -> t App a +app = lift + +-- | Get something from the app state +gets :: (AppState -> b) -> App b +gets f = ask >>= liftIO . STM.readTVarIO >>= return . f + +-- | Apply a function to the app state +modify :: (AppState -> AppState) -> App () +modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f + +type Namespace = Text -- ^ housing for a set of que paths +type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes +type Quepath = [Text] -- ^ any path can serve as an identifier for a que +type Quedata = ByteString -- ^ any opaque data +type Quebase = HashMap Quepath Que -- ^ a collection of ques + +-- | Lookup or create a que +que :: Namespace -> Quepath -> App Que +que ns qp = do + _ques <- gets ques + let qbase = grab ns _ques + queExists = HashMap.member qp qbase + if queExists + then return <| grab qp qbase + else do + c <- liftIO Go.chan + modify (insertQue ns qp c) + gets ques /> grab ns /> grab qp + +-- | Put data on the que. +pushQue :: Quedata -> Que -> IO () +pushQue = flip Go.write + +-- | Tap and read from the Que. Tap first because a Que is actually a +-- broadcast channel. This allows for multiconsumer Ques. +takeQue :: Que -> IO Quedata +takeQue ch = Go.mult ch >>= Go.tap diff --git a/default.nix b/default.nix index f249d53..f60a2a1 100644 --- a/default.nix +++ b/default.nix @@ -59,21 +59,6 @@ in { boot.enableContainers = true; }; }; - Que = buildGhc { - name = "Com.Simatime.Que"; - nick = "que"; - deps = [ - "aeson" - "async" - "protolude" - "scotty" - "servant" - "servant-server" - "stm" - "unagi-chan" - "unordered-containers" - ]; - }; }; Com.InfluencedByBooks = buildOS { configuration = { @@ -175,7 +160,21 @@ in { ]; }; }; - + Run.Que = buildGhc { + name = "Run.Que"; + nick = "que-server"; + deps = [ + "aeson" + "async" + "protolude" + "scotty" + "servant" + "servant-server" + "stm" + "unagi-chan" + "unordered-containers" + ]; + }; # fallthrough to nixpkgs nixpkgs = nixpkgs; } -- cgit v1.2.3