diff options
Diffstat (limited to 'Run/Que/Server.hs')
-rw-r--r-- | Run/Que/Server.hs | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/Run/Que/Server.hs b/Run/Que/Server.hs new file mode 100644 index 0000000..1acbe60 --- /dev/null +++ b/Run/Que/Server.hs @@ -0,0 +1,238 @@ +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} + +{- | Interprocess communication +-} +module Run.Que.Server + ( main + ) +where + +import Com.Simatime.Alpha hiding ( Text + , get + , gets + , modify + , poll + ) +import qualified Com.Simatime.Go as Go +import qualified Control.Concurrent.STM as STM +import qualified Control.Exception as Exception +import Control.Monad.Reader ( MonadTrans ) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Builder.Extra as Builder +import qualified Data.ByteString.Lazy as BSL +import Data.HashMap.Lazy ( HashMap ) +import qualified Data.HashMap.Lazy as HashMap +import qualified Data.Text.Encoding as Encoding +import Data.Text.Lazy ( Text + , fromStrict + ) +import qualified Data.Text.Lazy as Text +import qualified Network.HTTP.Types.Status as Http +import qualified Network.Socket as Socket +import qualified Network.Wai as Wai +import qualified Network.Wai.Handler.Warp as Warp +import Network.Wai.Middleware.RequestLogger + ( logStdoutDev ) +import qualified System.Console.GetOpt as Opt +import qualified System.Environment as Environment +import qualified Web.Scotty.Trans as Scotty + +main :: IO () +main = Exception.bracket startup shutdown run + where + run (p, waiapp) = + putText ("que-server starting on port " <> show p) >> Warp.run p waiapp + startup = do + opts <- Environment.getArgs /> getOpts + sync <- STM.newTVarIO opts + let runActionToIO m = runReaderT (runApp m) sync + waiapp <- Scotty.scottyAppT runActionToIO routes + return (port opts, waiapp) + shutdown :: a -> IO a + shutdown = pure . identity + getOpts args = case Opt.getOpt Opt.Permute options args of + ([] , [], _) -> Exception.throw ErrorParsingOptions + (opts, _ , _) -> smoosh initialAppState opts + options = + [ Opt.Option + ['p'] + ["port"] + (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT") + "port to run on " + ] + +data Error = ErrorParsingOptions + deriving (Show) + +instance Exception.Exception Error + +-- | Only allow my IP or local to access some route. +guardIP :: Wai.Request -> Scotty.ActionT Text App () +guardIP r = case Wai.remoteHost r of + Socket.SockAddrInet _ ip | ip `elem` allowed -> Scotty.status Http.ok200 + _ -> Scotty.status Http.methodNotAllowed405 + where + allowed = Socket.tupleToHostAddress </ [(72, 222, 221, 62), (127, 0, 0, 1)] + +routes :: Scotty.ScottyT Text App () +routes = do + Scotty.middleware logStdoutDev + + let quepath = "^/([[:alnum:]_]*)/([[:alnum:]._/]*)$" + let index = "^(/|/index.html)$" + + Scotty.get (Scotty.regex index) <| do + let (ns, qp) = ("_", ["index"]) + app . modify <| upsertNamespace ns + q <- app <| que ns qp + r <- liftIO <| takeQue q + Scotty.html <| fromStrict <| Encoding.decodeUtf8 r + + Scotty.post (Scotty.regex index) <| do + r <- Scotty.request + guardIP r + let (ns, qp) = ("_", ["index"]) + app . modify <| upsertNamespace ns + q <- app <| que ns qp + qdata <- Scotty.body + liftIO <| pushQue (BSL.toStrict qdata) q + return () + + Scotty.matchAny (Scotty.regex "^/([[:alnum:]_]*)/?$") <| do + -- matches '/ns' and '/ns/' but not '/ns/path' + Scotty.status Http.notImplemented501 + Scotty.text "namespace management coming soon" + + -- | Receive a value from a que. Blocks until a value is received, + -- then returns. If 'poll=true', then stream data from the Que to the + -- client. + Scotty.get (Scotty.regex quepath) <| do + (ns, qp) <- extract + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + poll <- Scotty.param "poll" !: (pure . const False) + case poll of + True -> Scotty.stream $ streamQue q + _ -> do + r <- liftIO <| takeQue q + Scotty.html <| fromStrict <| Encoding.decodeUtf8 r + + -- | Put a value on a que. Returns immediately. + Scotty.post (Scotty.regex quepath) <| do + r <- Scotty.request + when (BS.isPrefixOf "/_" <| Wai.rawPathInfo r) $ guardIP r + (ns, qp) <- extract + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + qdata <- Scotty.body + liftIO <| pushQue (BSL.toStrict qdata) q + return () + +-- | recover from a scotty-thrown exception. +(!:) + :: Scotty.ActionT Text App a -- ^ action that might throw + -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead + -> Scotty.ActionT Text App a +(!:) = Scotty.rescue + +-- | Forever write the data from 'Que' to 'Wai.StreamingBody'. +streamQue :: Que -> Wai.StreamingBody +streamQue q write _ = Go.mult q >>= loop + where + loop c = + Go.tap c + >>= (write . Builder.byteStringInsert) + >> (write <| Builder.byteStringInsert "\n") + >> loop c + +-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. +grab :: (Eq k, Hashable k) => k -> HashMap k v -> v +grab = flip (HashMap.!) + +-- | Inserts the namespace in 'AppState' if it doesn't exist. +upsertNamespace :: Namespace -> AppState -> AppState +upsertNamespace ns as = if HashMap.member ns (ques as) + then as + else as { ques = HashMap.insert ns mempty (ques as) } + +-- | Inserts the que at the proper 'Namespace' and 'Quepath'. +insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState +insertQue ns qp q as = as { ques = newQues } + where + newQues = HashMap.insert ns newQbase (ques as) + newQbase = HashMap.insert qp q <| grab ns <| ques as + +extract :: Scotty.ActionT Text App (Namespace, Quepath) +extract = do + ns <- Scotty.param "0" + path <- Scotty.param "1" + let p = Text.split (== '/') path |> filter (not . Text.null) + return (ns, p) + +newtype App a = App + { runApp :: ReaderT (STM.TVar AppState) IO a + } + deriving (Applicative, Functor, Monad, MonadIO, MonadReader + (STM.TVar AppState)) + +data AppState = AppState + { ques :: HashMap Namespace Quebase + , port :: Warp.Port + } + +initialAppState :: AppState +initialAppState = AppState { port = 80, ques = mempty } + +-- | Resolve a list of 'AppState' transitions into one. +smoosh + :: AppState -- ^ Initial app state to start with + -> [AppState -> AppState] -- ^ List of functions to apply in order + -> AppState +smoosh = foldr identity +-- there's gotta be a standard name for this + +-- | A synonym for 'lift' in order to be explicit about when we are +-- operating at the 'App' layer. +app :: MonadTrans t => App a -> t App a +app = lift + +-- | Get something from the app state +gets :: (AppState -> b) -> App b +gets f = ask >>= liftIO . STM.readTVarIO >>= return . f + +-- | Apply a function to the app state +modify :: (AppState -> AppState) -> App () +modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f + +type Namespace = Text -- ^ housing for a set of que paths +type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes +type Quepath = [Text] -- ^ any path can serve as an identifier for a que +type Quedata = ByteString -- ^ any opaque data +type Quebase = HashMap Quepath Que -- ^ a collection of ques + +-- | Lookup or create a que +que :: Namespace -> Quepath -> App Que +que ns qp = do + _ques <- gets ques + let qbase = grab ns _ques + queExists = HashMap.member qp qbase + if queExists + then return <| grab qp qbase + else do + c <- liftIO Go.chan + modify (insertQue ns qp c) + gets ques /> grab ns /> grab qp + +-- | Put data on the que. +pushQue :: Quedata -> Que -> IO () +pushQue = flip Go.write + +-- | Tap and read from the Que. Tap first because a Que is actually a +-- broadcast channel. This allows for multiconsumer Ques. +takeQue :: Que -> IO Quedata +takeQue ch = Go.mult ch >>= Go.tap |