{-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {- | Interprocess communication -} module Com.Simatime.Que ( main ) where import Com.Simatime.Alpha hiding ( Text , get , gets , modify ) import qualified Com.Simatime.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import GHC.Base ( String ) import Control.Monad.Reader ( MonadTrans ) import Data.HashMap.Lazy ( HashMap ) import qualified Data.HashMap.Lazy as HashMap import Data.Text.Lazy ( Text ) import qualified Data.Text.Lazy as Text import qualified Data.Text.Lazy.Encoding as Encoding import Network.Wai ( Application ) import qualified Network.Wai.Handler.Warp as Warp import Network.Wai.Middleware.RequestLogger ( logStdoutDev ) import qualified Web.Scotty.Trans as Scotty main :: IO () main = Exception.bracket startup shutdown run where run :: Application -> IO () run waiapp = Warp.run 8081 waiapp -- | TODO: startup/shutdown ekg server, katip scribes startup :: IO Application startup = do sync <- STM.newTVarIO <| AppState { ques = HashMap.empty } let runActionToIO m = runReaderT (runApp m) sync Scotty.scottyAppT runActionToIO routes shutdown :: a -> IO a shutdown = pure . identity routes :: Scotty.ScottyT Text App () routes = do Scotty.middleware logStdoutDev -- | Receive a value from a que. Blocks until a value is received. Scotty.get (Scotty.regex quepath) <| do (ns, qp) <- extract -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp r <- liftIO <| takeQue q Scotty.text r -- | Put a value on a que. Returns immediately. Scotty.post (Scotty.regex quepath) <| do (ns, qp) <- extract qdata <- Scotty.body -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp liftIO <| pushQue (Encoding.decodeUtf8 qdata) q return () -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'AppState' if it doesn't exist. upsertNamespace :: Namespace -> AppState -> AppState upsertNamespace ns as = if HashMap.member ns (ques as) then as else as { ques = HashMap.insert ns mempty (ques as) } insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState insertQue ns qp q as = as { ques = newQues } where newQues = HashMap.insert ns newQbase (ques as) newQbase = HashMap.insert qp q <| grab ns <| ques as quepath :: GHC.Base.String quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" extract :: Scotty.ActionT Text App (Namespace, Quepath) extract = do ns <- Scotty.param "0" path <- Scotty.param "1" return (ns, Text.split (== '/') path) newtype App a = App { runApp :: ReaderT (STM.TVar AppState) IO a } deriving (Applicative, Functor, Monad, MonadIO, MonadReader (STM.TVar AppState)) data AppState = AppState { ques :: HashMap Namespace Quebase } -- | A synonym for 'lift' in order to be explicit about when we are -- operating at the 'App' layer. app :: MonadTrans t => App a -> t App a app = lift -- | Get something from the app state gets :: (AppState -> b) -> App b gets f = ask >>= liftIO . STM.readTVarIO >>= return . f -- | Apply a function to the app state modify :: (AppState -> AppState) -> App () modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f -- * functionality type Namespace = Text -- ^ housing for a set of que paths --type Que = Go.Channel Quedata -- ^ a que is just a channel of json type Que = Go.Channel Quedata -- ^ a que is just a channel of json type Quepath = [Text] -- ^ any path can serve as an identifier for a que type Quedata = Text -- ^ any opaque data type Quebase = HashMap Quepath Que -- ^ a collection of ques -- | lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do _ques <- gets ques let qbase = grab ns _ques queExists = HashMap.member qp qbase if queExists then return <| grab qp qbase else do c <- liftIO Go.chan modify (insertQue ns qp c) gets ques /> grab ns /> grab qp -- | Put data on the que. pushQue :: Quedata -> Que -> IO () pushQue = flip Go.write -- | Tap and read from the Que. Tap first because a Que is actually a -- broadcast channel. This allows for multiconsumer Ques. takeQue :: Que -> IO Quedata takeQue ch = Go.mult ch >>= Go.tap