{-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {- | Interprocess communication -} module Run.Que ( main ) where import Com.Simatime.Alpha hiding ( Text , get , gets , modify , poll ) import qualified Com.Simatime.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Control.Monad.Reader ( MonadTrans ) import qualified Data.ByteString.Builder.Extra as Builder import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy ( HashMap ) import qualified Data.HashMap.Lazy as HashMap import qualified Data.Text.Encoding as Encoding import Data.Text.Lazy ( Text , fromStrict ) import qualified Data.Text.Lazy as Text import GHC.Base ( String ) import qualified Network.Wai as Wai import qualified Network.Wai.Handler.Warp as Warp import Network.Wai.Middleware.RequestLogger ( logStdoutDev ) import qualified System.Console.GetOpt as Opt import qualified System.Environment as Environment import qualified Web.Scotty.Trans as Scotty main :: IO () main = Exception.bracket startup shutdown run where run (p, waiapp) = putText ("que-server starting on port " <> show p) >> Warp.run p waiapp startup = do opts <- Environment.getArgs /> getOpts sync <- STM.newTVarIO opts let runActionToIO m = runReaderT (runApp m) sync waiapp <- Scotty.scottyAppT runActionToIO routes return (port opts, waiapp) shutdown :: a -> IO a shutdown = pure . identity getOpts args = case Opt.getOpt Opt.Permute options args of ([] , [], _) -> Exception.throw ErrorParsingOptions (opts, _ , _) -> smoosh initialAppState opts options = [ Opt.Option ['p'] ["port"] (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT") "port to run on " ] data Error = ErrorParsingOptions deriving (Show) instance Exception.Exception Error routes :: Scotty.ScottyT Text App () routes = do Scotty.middleware logStdoutDev -- | Receive a value from a que. Blocks until a value is received, -- then returns. If 'poll=true', then stream data from the Que to the -- client. Scotty.get (Scotty.regex quepath) <| do (ns, qp) <- extract -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp poll <- Scotty.param "poll" !: (pure . const False) case poll of True -> Scotty.stream $ streamQue q _ -> do r <- liftIO <| takeQue q Scotty.text <| fromStrict <| Encoding.decodeUtf8 r -- | Put a value on a que. Returns immediately. Scotty.post (Scotty.regex quepath) <| do (ns, qp) <- extract qdata <- Scotty.body -- ensure namespace exists app . modify <| upsertNamespace ns q <- app <| que ns qp liftIO <| pushQue (BSL.toStrict qdata) q return () -- | recover from a scotty-thrown exception. (!:) :: Scotty.ActionT Text App a -- ^ action that might throw -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead -> Scotty.ActionT Text App a (!:) = Scotty.rescue -- | Forever write the data from 'Que' to 'Wai.StreamingBody'. streamQue :: Que -> Wai.StreamingBody streamQue q write _ = Go.mult q >>= loop where loop c = Go.tap c >>= (write . Builder.byteStringInsert) >> (write <| Builder.byteStringInsert "\n") >> loop c -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'AppState' if it doesn't exist. upsertNamespace :: Namespace -> AppState -> AppState upsertNamespace ns as = if HashMap.member ns (ques as) then as else as { ques = HashMap.insert ns mempty (ques as) } -- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState insertQue ns qp q as = as { ques = newQues } where newQues = HashMap.insert ns newQbase (ques as) newQbase = HashMap.insert qp q <| grab ns <| ques as quepath :: GHC.Base.String quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$" extract :: Scotty.ActionT Text App (Namespace, Quepath) extract = do ns <- Scotty.param "0" path <- Scotty.param "1" let p = Text.split (== '/') path |> filter (not . Text.null) return (ns, p) newtype App a = App { runApp :: ReaderT (STM.TVar AppState) IO a } deriving (Applicative, Functor, Monad, MonadIO, MonadReader (STM.TVar AppState)) data AppState = AppState { ques :: HashMap Namespace Quebase , port :: Warp.Port } initialAppState :: AppState initialAppState = AppState { port = 80, ques = mempty } -- | Resolve a list of 'AppState' transitions into one. smoosh :: AppState -- ^ Initial app state to start with -> [AppState -> AppState] -- ^ List of functions to apply in order -> AppState smoosh = foldr identity -- there's gotta be a standard name for this -- | A synonym for 'lift' in order to be explicit about when we are -- operating at the 'App' layer. app :: MonadTrans t => App a -> t App a app = lift -- | Get something from the app state gets :: (AppState -> b) -> App b gets f = ask >>= liftIO . STM.readTVarIO >>= return . f -- | Apply a function to the app state modify :: (AppState -> AppState) -> App () modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f type Namespace = Text -- ^ housing for a set of que paths type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes type Quepath = [Text] -- ^ any path can serve as an identifier for a que type Quedata = ByteString -- ^ any opaque data type Quebase = HashMap Quepath Que -- ^ a collection of ques -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do _ques <- gets ques let qbase = grab ns _ques queExists = HashMap.member qp qbase if queExists then return <| grab qp qbase else do c <- liftIO Go.chan modify (insertQue ns qp c) gets ques /> grab ns /> grab qp -- | Put data on the que. pushQue :: Quedata -> Que -> IO () pushQue = flip Go.write -- | Tap and read from the Que. Tap first because a Que is actually a -- broadcast channel. This allows for multiconsumer Ques. takeQue :: Que -> IO Quedata takeQue ch = Go.mult ch >>= Go.tap