{-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE LambdaCase #-} {- | Interprocess communication -} module Run.Que.Server ( main ) where import Com.Simatime.Alpha hiding ( Text , get , gets , modify , poll ) import qualified Com.Simatime.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Control.Monad.Reader ( MonadTrans ) import qualified Data.ByteString.Builder.Extra as Builder import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy ( HashMap ) import qualified Data.HashMap.Lazy as HashMap import qualified Data.Text.Encoding as Encoding import Data.Text.Lazy ( Text , fromStrict ) import qualified Network.HTTP.Types.Status as Http import qualified Network.Wai as Wai import qualified Network.Wai.Handler.Warp as Warp import Network.Wai.Middleware.RequestLogger ( logStdout ) import qualified System.Console.GetOpt as Opt import qualified System.Environment as Environment import qualified Web.Scotty.Trans as Scotty main :: IO () main = Exception.bracket startup shutdown run where run (p, waiapp) = putText ("que-server starting on port " <> show p) >> Warp.run p waiapp startup = do opts <- Environment.getArgs /> getOpts sync <- STM.newTVarIO opts let runActionToIO m = runReaderT (runApp m) sync waiapp <- Scotty.scottyAppT runActionToIO routes return (port opts, waiapp) shutdown :: a -> IO a shutdown = pure . identity getOpts args = case Opt.getOpt Opt.Permute options args of ([] , [], _) -> Exception.throw ErrorParsingOptions (opts, _ , _) -> smoosh initialAppState opts options = [ Opt.Option ['p'] ["port"] (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT") "port to run on " ] data Error = ErrorParsingOptions deriving (Show) instance Exception.Exception Error routes :: Scotty.ScottyT Text App () routes = do Scotty.middleware logStdout let quepath = "^\\/([[:alnum:]_]+)\\/([[:alnum:]._/]*)$" let namespace = "^\\/([[:alnum:]_]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path' -- | GET /index.html Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index" Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index" -- | Namespace management Scotty.matchAny (Scotty.regex namespace) <| do Scotty.status Http.notImplemented501 Scotty.text "namespace management coming soon" -- | GET que -- -- Receive a value from a que. Blocks until a value is received, -- then returns. If 'poll=true', then stream data from the Que to the -- client. Scotty.get (Scotty.regex quepath) <| do (ns, qp) <- extract app . modify <| upsertNamespace ns q <- app <| que ns qp poll <- Scotty.param "poll" !: (pure . const False) guardNs ns ["pub", "_"] case poll of True -> Scotty.stream $ streamQue q _ -> do r <- liftIO <| takeQue q Scotty.html <| fromStrict <| Encoding.decodeUtf8 r -- | POST que -- -- Put a value on a que. Returns immediately. Scotty.post (Scotty.regex quepath) <| do xFwdHost <- Scotty.header "X-Forwarded-Host" xRealIP <- Scotty.header "X-Real-IP" host <- Scotty.header "Host" (ns, qp) <- extract -- Only allow my IP or localhost to publish to '_' namespace when ("_" == ns) <| case (xFwdHost, xRealIP, host) of (Just "73.222.221.62", _, _) -> Scotty.status Http.ok200 (_, Just "73.222.221.62", _) -> Scotty.status Http.ok200 (_, _, Just ("localhost:3000")) -> Scotty.status Http.ok200 _ -> Scotty.status Http.methodNotAllowed405 >> Scotty.text "not allowed: _ is a reserved namespace" guardNs ns ["pub", "_"] -- passed all auth checks app . modify <| upsertNamespace ns q <- app <| que ns qp qdata <- Scotty.body liftIO <| pushQue (BSL.toStrict qdata) q return () -- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist` -- list, return a 405 error. guardNs :: Text -> [Text] -> Scotty.ActionT Text App () guardNs ns whitelist = when (not <| ns `elem` whitelist) <| do Scotty.status Http.methodNotAllowed405 Scotty.text <| "not allowed: use 'pub' namespace or signup to protect '" <> ns <> "' at https://que.run" -- | recover from a scotty-thrown exception. (!:) :: Scotty.ActionT Text App a -- ^ action that might throw -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead -> Scotty.ActionT Text App a (!:) = Scotty.rescue -- | Forever write the data from 'Que' to 'Wai.StreamingBody'. streamQue :: Que -> Wai.StreamingBody streamQue q write _ = Go.mult q >>= loop where loop c = Go.tap c >>= (write . Builder.byteStringInsert) >> (write <| Builder.byteStringInsert "\n") >> loop c -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'AppState' if it doesn't exist. upsertNamespace :: Namespace -> AppState -> AppState upsertNamespace ns as = if HashMap.member ns (ques as) then as else as { ques = HashMap.insert ns mempty (ques as) } -- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState insertQue ns qp q as = as { ques = newQues } where newQues = HashMap.insert ns newQbase (ques as) newQbase = HashMap.insert qp q <| grab ns <| ques as extract :: Scotty.ActionT Text App (Namespace, Quepath) extract = do ns <- Scotty.param "1" path <- Scotty.param "2" return (ns, path) newtype App a = App { runApp :: ReaderT (STM.TVar AppState) IO a } deriving (Applicative, Functor, Monad, MonadIO, MonadReader (STM.TVar AppState)) data AppState = AppState { ques :: HashMap Namespace Quebase , port :: Warp.Port } initialAppState :: AppState initialAppState = AppState { port = 80, ques = mempty } -- | Resolve a list of 'AppState' transitions into one. smoosh :: AppState -- ^ Initial app state to start with -> [AppState -> AppState] -- ^ List of functions to apply in order -> AppState smoosh = foldr identity -- there's gotta be a standard name for this -- | A synonym for 'lift' in order to be explicit about when we are -- operating at the 'App' layer. app :: MonadTrans t => App a -> t App a app = lift -- | Get something from the app state gets :: (AppState -> b) -> App b gets f = ask >>= liftIO . STM.readTVarIO >>= return . f -- | Apply a function to the app state modify :: (AppState -> AppState) -> App () modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f type Namespace = Text -- ^ housing for a set of que paths type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes type Quepath = [Text] -- ^ any path can serve as an identifier for a que type Quedata = ByteString -- ^ any opaque data type Quebase = HashMap Quepath Que -- ^ a collection of ques -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do _ques <- gets ques let qbase = grab ns _ques queExists = HashMap.member qp qbase if queExists then return <| grab qp qbase else do c <- liftIO Go.chan modify (insertQue ns qp c) gets ques /> grab ns /> grab qp -- | Put data on the que. pushQue :: Quedata -> Que -> IO () pushQue = flip Go.write -- | Tap and read from the Que. Tap first because a Que is actually a -- broadcast channel. This allows for multiconsumer Ques. takeQue :: Que -> IO Quedata takeQue ch = Go.mult ch >>= Go.tap