{-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {- | Interprocess communication -} module Com.Simatime.Que ( main ) where import Com.Simatime.Alpha hiding ( Text , get , gets , modify , poll ) import qualified Com.Simatime.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Control.Monad.Reader ( MonadTrans ) import qualified Data.ByteString.Builder.Extra as Builder import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy ( HashMap ) import qualified Data.HashMap.Lazy as HashMap import qualified Data.Text.Lazy as Text import Data.Text.Lazy ( Text , fromStrict ) import qualified Data.Text.Encoding as Encoding import GHC.Base ( String ) import qualified Network.Wai as Wai import qualified Network.Wai.Handler.Warp as Warp import Network.Wai.Middleware.RequestLogger ( logStdoutDev ) import qualified Web.Scotty.Trans as Scotty main :: IO () main = Exception.bracket startup shutdown run where run :: Wai.Application -> IO () run waiapp = Warp.run 8081 waiapp -- | TODO: startup/shutdown ekg server, katip scribes startup :: IO Wai.Application startup = do sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } let runActionToIO m = runReaderT (runApp m) sync Scotty.scottyAppT runActionToIO routes shutdown :: a -> IO a shutdown = pure . identity routes :: Scotty.ScottyT Text App () routes = do Scotty.middleware logStdoutDev -- | Receive a value from a que. Blocks until a value is received, -- then returns. If 'poll=true', then stream data from the Que to the -- client. Scotty.get (Scotty.regex quepath) <| do (ns, qp) <- extract -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text case poll of "true" -> Scotty.stream $ streamQue q _ -> do r <- liftIO <| takeQue q Scotty.text <| fromStrict <| Encoding.decodeUtf8 r -- | Put a value on a que. Returns immediately. Scotty.post (Scotty.regex quepath) <| do (ns, qp) <- extract qdata <- Scotty.body -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp liftIO <| pushQue (BSL.toStrict qdata) q return () -- | Forever write the data from 'Que' to 'Wai.StreamingBody'. streamQue :: Que -> Wai.StreamingBody streamQue q write _ = Go.mult q >>= loop where loop c = Go.tap c >>= (write . Builder.byteStringInsert) >> (write <| Builder.byteStringInsert "\n") >> loop c -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'AppState' if it doesn't exist. upsertNamespace :: Namespace -> AppState -> AppState upsertNamespace ns as = if HashMap.member ns (ques as) then as else as { ques = HashMap.insert ns mempty (ques as) } -- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState insertQue ns qp q as = as { ques = newQues } where newQues = HashMap.insert ns newQbase (ques as) newQbase = HashMap.insert qp q <| grab ns <| ques as quepath :: GHC.Base.String quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" extract :: Scotty.ActionT Text App (Namespace, Quepath) extract = do ns <- Scotty.param "0" path <- Scotty.param "1" let p = Text.split (== '/') path |> filter (not . Text.null) return (ns, p) newtype App a = App { runApp :: ReaderT (STM.TVar AppState) IO a } deriving (Applicative, Functor, Monad, MonadIO, MonadReader (STM.TVar AppState)) data AppState = AppState { ques :: HashMap Namespace Quebase } -- | A synonym for 'lift' in order to be explicit about when we are -- operating at the 'App' layer. app :: MonadTrans t => App a -> t App a app = lift -- | Get something from the app state gets :: (AppState -> b) -> App b gets f = ask >>= liftIO . STM.readTVarIO >>= return . f -- | Apply a function to the app state modify :: (AppState -> AppState) -> App () modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f type Namespace = Text -- ^ housing for a set of que paths type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes type Quepath = [Text] -- ^ any path can serve as an identifier for a que type Quedata = ByteString -- ^ any opaque data type Quebase = HashMap Quepath Que -- ^ a collection of ques -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do _ques <- gets ques let qbase = grab ns _ques queExists = HashMap.member qp qbase if queExists then return <| grab qp qbase else do c <- liftIO Go.chan modify (insertQue ns qp c) gets ques /> grab ns /> grab qp -- | Put data on the que. pushQue :: Quedata -> Que -> IO () pushQue = flip Go.write -- | Tap and read from the Que. Tap first because a Que is actually a -- broadcast channel. This allows for multiconsumer Ques. takeQue :: Que -> IO Quedata takeQue ch = Go.mult ch >>= Go.tap