summaryrefslogtreecommitdiff
path: root/Biz/Que/Host.hs
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2021-08-04 11:09:35 -0400
committerBen Sima <ben@bsima.me>2021-11-26 13:47:37 -0500
commit9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch)
treec1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Biz/Que/Host.hs
parent0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff)
Reimplement Que with Servant
Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Biz/Que/Host.hs')
-rw-r--r--Biz/Que/Host.hs311
1 files changed, 151 insertions, 160 deletions
diff --git a/Biz/Que/Host.hs b/Biz/Que/Host.hs
index 40ee1a5..702827e 100644
--- a/Biz/Que/Host.hs
+++ b/Biz/Que/Host.hs
@@ -1,8 +1,15 @@
+{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE NoImplicitPrelude #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | Interprocess communication
--
@@ -20,29 +27,22 @@ where
import Alpha hiding (gets, modify, poll)
import qualified Biz.Cli as Cli
+import qualified Biz.Log as Log
import Biz.Test ((@=?))
import qualified Biz.Test as Test
import qualified Control.Concurrent.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
-import Control.Monad.Reader (MonadTrans)
-import qualified Data.ByteString.Builder.Extra as Builder
-import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Lazy (HashMap)
import qualified Data.HashMap.Lazy as HashMap
-import qualified Data.Text.Encoding as Encoding
-import qualified Data.Text.Lazy as Text.Lazy
-import qualified Network.HTTP.Types.Status as Http
-import qualified Network.Wai as Wai
+import Network.HTTP.Media ((//), (/:))
import qualified Network.Wai.Handler.Warp as Warp
-import Network.Wai.Middleware.RequestLogger
- ( logStdout,
- )
+import Servant
+import Servant.API.Generic ((:-))
+-- import qualified Servant.Auth.Server as Auth
+import Servant.Server.Generic (AsServerT, genericServeT)
+import qualified Servant.Types.SourceT as Source
import qualified System.Envy as Envy
-import qualified Web.Scotty.Trans as Scotty
-import qualified Prelude
-
-{-# ANN module ("HLint: ignore Reduce duplication" :: Prelude.String) #-}
main :: IO ()
main = Cli.main <| Cli.Plan help move test pure
@@ -51,15 +51,17 @@ move :: Cli.Arguments -> IO ()
move _ = Exception.bracket startup shutdown <| uncurry Warp.run
where
startup =
- Envy.decodeWithDefaults Envy.defConfig +> \c -> do
- sync <- STM.newTVarIO initialAppState
- let runActionToIO m = runReaderT (runApp m) sync
- waiapp <- Scotty.scottyAppT runActionToIO <| routes c
- putText "*"
- putText "que"
- putText <| "port: " <> (show <| quePort c)
- putText <| "skey: " <> (Text.Lazy.toStrict <| queSkey c)
- return (quePort c, waiapp)
+ Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do
+ initialState <- atomically <| STM.newTVar mempty
+ -- natural transformation
+ let nt :: AppState -> App a -> Servant.Handler a
+ nt s x = runReaderT x s
+ let app :: AppState -> Application
+ app s = genericServeT (nt s) (paths cfg)
+ Log.info ["boot", "que"] >> Log.br
+ Log.info ["boot", "port", show <| quePort] >> Log.br
+ Log.info ["boot", "skey", queSkey] >> Log.br
+ pure (quePort, app initialState)
shutdown :: a -> IO a
shutdown = pure <. identity
@@ -76,167 +78,149 @@ Usage:
test :: Test.Tree
test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)]
-newtype App a = App
- { runApp :: ReaderT (STM.TVar AppState) IO a
- }
- deriving
- ( Applicative,
- Functor,
- Monad,
- MonadIO,
- MonadReader
- (STM.TVar AppState)
- )
-
-newtype AppState = AppState
- { ques :: HashMap Namespace Quebase
- }
+type App = ReaderT AppState Servant.Handler
-initialAppState :: AppState
-initialAppState = AppState {ques = mempty}
+type Ques = HashMap Namespace Quebase
+
+type AppState = STM.TVar Ques
data Config = Config
{ -- | QUE_PORT
quePort :: Warp.Port,
-- | QUE_SKEY
- queSkey :: Text.Lazy.Text
+ queSkey :: Text
}
deriving (Generic, Show)
instance Envy.DefConfig Config where
- defConfig = Config 3000 "admin-key"
+ defConfig = Config 3001 "admin-key"
instance Envy.FromEnv Config
-routes :: Config -> Scotty.ScottyT Text.Lazy.Text App ()
-routes cfg = do
- Scotty.middleware logStdout
- let quepath = "^\\/([[:alnum:]_-]+)\\/([[:alnum:]._/-]*)$"
- let namespace = "^\\/([[:alnum:]_-]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path'
-
- -- GET /index.html
- Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index"
- Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index"
- -- GET /_/dash
- Scotty.get (Scotty.literal "/_/dash") <| do
- authkey <- fromMaybe "" </ Scotty.header "Authorization"
- if authkey == (Text.Lazy.strip <| queSkey cfg)
- then do
- d <- app <| gets ques
- Scotty.json d
- else do
- Scotty.status Http.methodNotAllowed405
- Scotty.text "not allowed"
- -- Namespace management
- Scotty.matchAny (Scotty.regex namespace) <| do
- Scotty.status Http.notImplemented501
- Scotty.text "namespace management coming soon"
- -- GET que
- --
- -- Receive a value from a que. Blocks until a value is received,
- -- then returns. If 'poll=true', then stream data from the Que to the
- -- client.
- Scotty.get (Scotty.regex quepath) <| do
- (ns, qp) <- extract
- guardNs ns ["pub", "_"]
- app <. modify <| upsertNamespace ns
- q <- app <| que ns qp
- poll <- Scotty.param "poll" !: (pure <. const False)
- if poll
- then Scotty.stream <| streamQue q
- else do
- r <- liftIO <| Go.read q
- Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
- -- POST que
- --
- -- Put a value on a que. Returns immediately.
- Scotty.post (Scotty.regex quepath) <| do
- authkey <- fromMaybe "" </ Scotty.header "Authorization"
- (ns, qp) <- extract
- -- Only allow my IP or localhost to publish to '_' namespace
- when
- ("_" == ns && authkey /= (Text.Lazy.strip <| queSkey cfg))
- ( Scotty.status Http.methodNotAllowed405
- >> Scotty.text "not allowed: _ is a reserved namespace"
- >> Scotty.finish
- )
- guardNs ns ["pub", "_"]
- -- passed all auth checks
- app <. modify <| upsertNamespace ns
- q <- app <| que ns qp
- qdata <- Scotty.body
- _ <- liftIO <| Go.write q <| BSL.toStrict qdata
- return ()
+-- | A simple HTML type. This recognizes "content-type: text/html" but doesn't
+-- do any conversion, rendering, or sanitization like the
+-- 'Servant.HTML.Lucid.HTML' type would do.
+data HTML deriving (Typeable)
+
+instance Accept HTML where
+ contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"]
+
+instance MimeRender HTML ByteString where
+ mimeRender _ x = str x
+
+instance MimeUnrender HTML Text where
+ mimeUnrender _ bs = Right <| str bs
+
+instance MimeUnrender OctetStream Text where
+ mimeUnrender _ bs = Right <| str bs
+
+instance MimeRender PlainText ByteString where
+ mimeRender _ bs = str bs
+
+instance MimeUnrender PlainText ByteString where
+ mimeUnrender _ bs = Right <| str bs
+
+data Paths path = Paths
+ { home ::
+ path
+ :- Get '[JSON] NoContent,
+ dash ::
+ path
+ :- "_"
+ :> "dash"
+ :> Get '[JSON] Ques,
+ getQue ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quename" Text
+ :> Get '[PlainText, HTML, OctetStream] Message,
+ getStream ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quename" Text
+ :> "stream"
+ :> StreamGet NoFraming OctetStream (SourceIO Message),
+ putQue ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quepath" Text
+ :> ReqBody '[PlainText, HTML, OctetStream] Text
+ :> Post '[PlainText, HTML, OctetStream] NoContent
+ }
+ deriving (Generic)
+
+paths :: Config -> Paths (AsServerT App)
+paths _ =
+ -- TODO revive authkey stuff
+ -- - read Authorization header, compare with queSkey
+ -- - Only allow my IP or localhost to publish to '_' namespace
+ Paths
+ { home =
+ throwError <| err301 {errHeaders = [("Location", "/_/index")]},
+ dash = gets,
+ getQue = \ns qn -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qn
+ Go.mult q
+ |> liftIO
+ +> Go.tap
+ |> liftIO,
+ getStream = \ns qn -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qn
+ Go.mult q
+ |> liftIO
+ +> Go.tap
+ |> Source.fromAction (const False) -- peek chan instead of False?
+ |> pure,
+ putQue = \ns qp body -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qp
+ body
+ |> str
+ |> Go.write q
+ >> Go.read q -- flush the que, otherwise msgs never clear
+ |> liftIO
+ -- TODO: detect number of readers, respond with "sent to N readers" or
+ -- "no readers, msg lost"
+ >> pure NoContent
+ }
-- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist`
-- list, return a 405 error.
-guardNs :: Text.Lazy.Text -> [Text.Lazy.Text] -> Scotty.ActionT Text.Lazy.Text App ()
+guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a ()
guardNs ns whitelist =
when (not <| ns `elem` whitelist) <| do
- Scotty.status Http.methodNotAllowed405
- Scotty.text
- <| "not allowed: use 'pub' namespace or signup to protect '"
- <> ns
- <> "' at https://que.run"
- Scotty.finish
-
--- | recover from a scotty-thrown exception.
-(!:) ::
- -- | action that might throw
- Scotty.ActionT Text.Lazy.Text App a ->
- -- | a function providing a default response instead
- (Text.Lazy.Text -> Scotty.ActionT Text.Lazy.Text App a) ->
- Scotty.ActionT Text.Lazy.Text App a
-(!:) = Scotty.rescue
-
--- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
-streamQue :: Que -> Wai.StreamingBody
-streamQue q write _ = loop q
+ throwError <| err405 {errBody = str msg}
where
- loop c =
- Go.read c
- +> (write <. Builder.byteStringInsert)
- >> loop c
+ msg =
+ "not allowed: use 'pub' namespace or signup to protect '"
+ <> ns
+ <> "' at https://que.run"
-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)
--- | Inserts the namespace in 'AppState' if it doesn't exist.
-upsertNamespace :: Namespace -> AppState -> AppState
+-- | Inserts the namespace in 'Ques' if it doesn't exist.
+upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase
upsertNamespace ns as =
- if HashMap.member ns (ques as)
+ if HashMap.member ns as
then as
- else as {ques = HashMap.insert ns mempty (ques as)}
+ else HashMap.insert ns mempty as
-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
-insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
-insertQue ns qp q as = as {ques = newQues}
+insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase
+insertQue ns qp q hm = newQues
where
- newQues = HashMap.insert ns newQbase (ques as)
- newQbase = HashMap.insert qp q <| grab ns <| ques as
-
-extract :: Scotty.ActionT Text.Lazy.Text App (Namespace, Quepath)
-extract = do
- ns <- Scotty.param "1"
- path <- Scotty.param "2"
- return (ns, path)
-
--- | A synonym for 'lift' in order to be explicit about when we are
--- operating at the 'App' layer.
-app :: MonadTrans t => App a -> t App a
-app = lift
-
--- | Get something from the app state
-gets :: (AppState -> b) -> App b
-gets f = ask +> liftIO <. STM.readTVarIO +> return </ f
-
--- | Apply a function to the app state
-modify :: (AppState -> AppState) -> App ()
-modify f = ask +> liftIO <. atomically <. flip STM.modifyTVar' f
+ newQues = HashMap.insert ns newQbase hm
+ newQbase = HashMap.insert qp q <| grab ns hm
-- | housing for a set of que paths
-type Namespace = Text.Lazy.Text
+type Namespace = Text
-- | a que is just a channel of bytes
type Que = Go.Channel Message
@@ -250,15 +234,22 @@ type Message = ByteString
-- | a collection of ques
type Quebase = HashMap Quepath Que
+-- | Get app state
+gets :: App Ques
+gets = ask +> STM.readTVarIO .> liftIO +> pure
+
+-- | Apply a function to the app state
+modify :: (Ques -> Ques) -> App ()
+modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO
+
-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
- _ques <- gets ques
- let qbase = grab ns _ques
- queExists = HashMap.member qp qbase
- if queExists
- then return <| grab qp qbase
+ ques <- gets
+ let qbase = grab ns ques
+ if HashMap.member qp qbase
+ then pure <| grab qp qbase
else do
- c <- liftIO <| Go.chan 1
+ c <- liftIO <| Go.chan 5
modify (insertQue ns qp c)
- gets ques /> grab ns /> grab qp
+ gets /> grab ns /> grab qp