diff options
author | Ben Sima <ben@bsima.me> | 2021-08-04 11:09:35 -0400 |
---|---|---|
committer | Ben Sima <ben@bsima.me> | 2021-11-26 13:47:37 -0500 |
commit | 9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch) | |
tree | c1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Biz/Que/Host.hs | |
parent | 0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff) |
Reimplement Que with Servant
Still todo: add authentication. But that can wait.
In re-implementing this, I was able to figure out how to get the Go.mult working
properly as well. The problem is that a tap from a mult channel does not remove
the message from the original channel. I'm not sure if that should be a core
feature or not; for now I'm just draining the channel when it's received in the
Que HTTP handler. (Also, this would be a good place to put persistence: have a
background job read from the original channel, and write the msg to disk via
acid-state; this would obviate the need for a flush to nowhere.)
Also, streaming is working now. The problem was that Scotty closes the
connection after it sees a newline in the body, or something, so streaming over
Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Biz/Que/Host.hs')
-rw-r--r-- | Biz/Que/Host.hs | 311 |
1 files changed, 151 insertions, 160 deletions
diff --git a/Biz/Que/Host.hs b/Biz/Que/Host.hs index 40ee1a5..702827e 100644 --- a/Biz/Que/Host.hs +++ b/Biz/Que/Host.hs @@ -1,8 +1,15 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeOperators #-} {-# LANGUAGE NoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} -- | Interprocess communication -- @@ -20,29 +27,22 @@ where import Alpha hiding (gets, modify, poll) import qualified Biz.Cli as Cli +import qualified Biz.Log as Log import Biz.Test ((@=?)) import qualified Biz.Test as Test import qualified Control.Concurrent.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception -import Control.Monad.Reader (MonadTrans) -import qualified Data.ByteString.Builder.Extra as Builder -import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy (HashMap) import qualified Data.HashMap.Lazy as HashMap -import qualified Data.Text.Encoding as Encoding -import qualified Data.Text.Lazy as Text.Lazy -import qualified Network.HTTP.Types.Status as Http -import qualified Network.Wai as Wai +import Network.HTTP.Media ((//), (/:)) import qualified Network.Wai.Handler.Warp as Warp -import Network.Wai.Middleware.RequestLogger - ( logStdout, - ) +import Servant +import Servant.API.Generic ((:-)) +-- import qualified Servant.Auth.Server as Auth +import Servant.Server.Generic (AsServerT, genericServeT) +import qualified Servant.Types.SourceT as Source import qualified System.Envy as Envy -import qualified Web.Scotty.Trans as Scotty -import qualified Prelude - -{-# ANN module ("HLint: ignore Reduce duplication" :: Prelude.String) #-} main :: IO () main = Cli.main <| Cli.Plan help move test pure @@ -51,15 +51,17 @@ move :: Cli.Arguments -> IO () move _ = Exception.bracket startup shutdown <| uncurry Warp.run where startup = - Envy.decodeWithDefaults Envy.defConfig +> \c -> do - sync <- STM.newTVarIO initialAppState - let runActionToIO m = runReaderT (runApp m) sync - waiapp <- Scotty.scottyAppT runActionToIO <| routes c - putText "*" - putText "que" - putText <| "port: " <> (show <| quePort c) - putText <| "skey: " <> (Text.Lazy.toStrict <| queSkey c) - return (quePort c, waiapp) + Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do + initialState <- atomically <| STM.newTVar mempty + -- natural transformation + let nt :: AppState -> App a -> Servant.Handler a + nt s x = runReaderT x s + let app :: AppState -> Application + app s = genericServeT (nt s) (paths cfg) + Log.info ["boot", "que"] >> Log.br + Log.info ["boot", "port", show <| quePort] >> Log.br + Log.info ["boot", "skey", queSkey] >> Log.br + pure (quePort, app initialState) shutdown :: a -> IO a shutdown = pure <. identity @@ -76,167 +78,149 @@ Usage: test :: Test.Tree test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)] -newtype App a = App - { runApp :: ReaderT (STM.TVar AppState) IO a - } - deriving - ( Applicative, - Functor, - Monad, - MonadIO, - MonadReader - (STM.TVar AppState) - ) - -newtype AppState = AppState - { ques :: HashMap Namespace Quebase - } +type App = ReaderT AppState Servant.Handler -initialAppState :: AppState -initialAppState = AppState {ques = mempty} +type Ques = HashMap Namespace Quebase + +type AppState = STM.TVar Ques data Config = Config { -- | QUE_PORT quePort :: Warp.Port, -- | QUE_SKEY - queSkey :: Text.Lazy.Text + queSkey :: Text } deriving (Generic, Show) instance Envy.DefConfig Config where - defConfig = Config 3000 "admin-key" + defConfig = Config 3001 "admin-key" instance Envy.FromEnv Config -routes :: Config -> Scotty.ScottyT Text.Lazy.Text App () -routes cfg = do - Scotty.middleware logStdout - let quepath = "^\\/([[:alnum:]_-]+)\\/([[:alnum:]._/-]*)$" - let namespace = "^\\/([[:alnum:]_-]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path' - - -- GET /index.html - Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index" - Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index" - -- GET /_/dash - Scotty.get (Scotty.literal "/_/dash") <| do - authkey <- fromMaybe "" </ Scotty.header "Authorization" - if authkey == (Text.Lazy.strip <| queSkey cfg) - then do - d <- app <| gets ques - Scotty.json d - else do - Scotty.status Http.methodNotAllowed405 - Scotty.text "not allowed" - -- Namespace management - Scotty.matchAny (Scotty.regex namespace) <| do - Scotty.status Http.notImplemented501 - Scotty.text "namespace management coming soon" - -- GET que - -- - -- Receive a value from a que. Blocks until a value is received, - -- then returns. If 'poll=true', then stream data from the Que to the - -- client. - Scotty.get (Scotty.regex quepath) <| do - (ns, qp) <- extract - guardNs ns ["pub", "_"] - app <. modify <| upsertNamespace ns - q <- app <| que ns qp - poll <- Scotty.param "poll" !: (pure <. const False) - if poll - then Scotty.stream <| streamQue q - else do - r <- liftIO <| Go.read q - Scotty.html <| fromStrict <| Encoding.decodeUtf8 r - -- POST que - -- - -- Put a value on a que. Returns immediately. - Scotty.post (Scotty.regex quepath) <| do - authkey <- fromMaybe "" </ Scotty.header "Authorization" - (ns, qp) <- extract - -- Only allow my IP or localhost to publish to '_' namespace - when - ("_" == ns && authkey /= (Text.Lazy.strip <| queSkey cfg)) - ( Scotty.status Http.methodNotAllowed405 - >> Scotty.text "not allowed: _ is a reserved namespace" - >> Scotty.finish - ) - guardNs ns ["pub", "_"] - -- passed all auth checks - app <. modify <| upsertNamespace ns - q <- app <| que ns qp - qdata <- Scotty.body - _ <- liftIO <| Go.write q <| BSL.toStrict qdata - return () +-- | A simple HTML type. This recognizes "content-type: text/html" but doesn't +-- do any conversion, rendering, or sanitization like the +-- 'Servant.HTML.Lucid.HTML' type would do. +data HTML deriving (Typeable) + +instance Accept HTML where + contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"] + +instance MimeRender HTML ByteString where + mimeRender _ x = str x + +instance MimeUnrender HTML Text where + mimeUnrender _ bs = Right <| str bs + +instance MimeUnrender OctetStream Text where + mimeUnrender _ bs = Right <| str bs + +instance MimeRender PlainText ByteString where + mimeRender _ bs = str bs + +instance MimeUnrender PlainText ByteString where + mimeUnrender _ bs = Right <| str bs + +data Paths path = Paths + { home :: + path + :- Get '[JSON] NoContent, + dash :: + path + :- "_" + :> "dash" + :> Get '[JSON] Ques, + getQue :: + path + :- Capture "ns" Text + :> Capture "quename" Text + :> Get '[PlainText, HTML, OctetStream] Message, + getStream :: + path + :- Capture "ns" Text + :> Capture "quename" Text + :> "stream" + :> StreamGet NoFraming OctetStream (SourceIO Message), + putQue :: + path + :- Capture "ns" Text + :> Capture "quepath" Text + :> ReqBody '[PlainText, HTML, OctetStream] Text + :> Post '[PlainText, HTML, OctetStream] NoContent + } + deriving (Generic) + +paths :: Config -> Paths (AsServerT App) +paths _ = + -- TODO revive authkey stuff + -- - read Authorization header, compare with queSkey + -- - Only allow my IP or localhost to publish to '_' namespace + Paths + { home = + throwError <| err301 {errHeaders = [("Location", "/_/index")]}, + dash = gets, + getQue = \ns qn -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qn + Go.mult q + |> liftIO + +> Go.tap + |> liftIO, + getStream = \ns qn -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qn + Go.mult q + |> liftIO + +> Go.tap + |> Source.fromAction (const False) -- peek chan instead of False? + |> pure, + putQue = \ns qp body -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qp + body + |> str + |> Go.write q + >> Go.read q -- flush the que, otherwise msgs never clear + |> liftIO + -- TODO: detect number of readers, respond with "sent to N readers" or + -- "no readers, msg lost" + >> pure NoContent + } -- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist` -- list, return a 405 error. -guardNs :: Text.Lazy.Text -> [Text.Lazy.Text] -> Scotty.ActionT Text.Lazy.Text App () +guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a () guardNs ns whitelist = when (not <| ns `elem` whitelist) <| do - Scotty.status Http.methodNotAllowed405 - Scotty.text - <| "not allowed: use 'pub' namespace or signup to protect '" - <> ns - <> "' at https://que.run" - Scotty.finish - --- | recover from a scotty-thrown exception. -(!:) :: - -- | action that might throw - Scotty.ActionT Text.Lazy.Text App a -> - -- | a function providing a default response instead - (Text.Lazy.Text -> Scotty.ActionT Text.Lazy.Text App a) -> - Scotty.ActionT Text.Lazy.Text App a -(!:) = Scotty.rescue - --- | Forever write the data from 'Que' to 'Wai.StreamingBody'. -streamQue :: Que -> Wai.StreamingBody -streamQue q write _ = loop q + throwError <| err405 {errBody = str msg} where - loop c = - Go.read c - +> (write <. Builder.byteStringInsert) - >> loop c + msg = + "not allowed: use 'pub' namespace or signup to protect '" + <> ns + <> "' at https://que.run" -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) --- | Inserts the namespace in 'AppState' if it doesn't exist. -upsertNamespace :: Namespace -> AppState -> AppState +-- | Inserts the namespace in 'Ques' if it doesn't exist. +upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase upsertNamespace ns as = - if HashMap.member ns (ques as) + if HashMap.member ns as then as - else as {ques = HashMap.insert ns mempty (ques as)} + else HashMap.insert ns mempty as -- | Inserts the que at the proper 'Namespace' and 'Quepath'. -insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState -insertQue ns qp q as = as {ques = newQues} +insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase +insertQue ns qp q hm = newQues where - newQues = HashMap.insert ns newQbase (ques as) - newQbase = HashMap.insert qp q <| grab ns <| ques as - -extract :: Scotty.ActionT Text.Lazy.Text App (Namespace, Quepath) -extract = do - ns <- Scotty.param "1" - path <- Scotty.param "2" - return (ns, path) - --- | A synonym for 'lift' in order to be explicit about when we are --- operating at the 'App' layer. -app :: MonadTrans t => App a -> t App a -app = lift - --- | Get something from the app state -gets :: (AppState -> b) -> App b -gets f = ask +> liftIO <. STM.readTVarIO +> return </ f - --- | Apply a function to the app state -modify :: (AppState -> AppState) -> App () -modify f = ask +> liftIO <. atomically <. flip STM.modifyTVar' f + newQues = HashMap.insert ns newQbase hm + newQbase = HashMap.insert qp q <| grab ns hm -- | housing for a set of que paths -type Namespace = Text.Lazy.Text +type Namespace = Text -- | a que is just a channel of bytes type Que = Go.Channel Message @@ -250,15 +234,22 @@ type Message = ByteString -- | a collection of ques type Quebase = HashMap Quepath Que +-- | Get app state +gets :: App Ques +gets = ask +> STM.readTVarIO .> liftIO +> pure + +-- | Apply a function to the app state +modify :: (Ques -> Ques) -> App () +modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO + -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do - _ques <- gets ques - let qbase = grab ns _ques - queExists = HashMap.member qp qbase - if queExists - then return <| grab qp qbase + ques <- gets + let qbase = grab ns ques + if HashMap.member qp qbase + then pure <| grab qp qbase else do - c <- liftIO <| Go.chan 1 + c <- liftIO <| Go.chan 5 modify (insertQue ns qp c) - gets ques /> grab ns /> grab qp + gets /> grab ns /> grab qp |