summaryrefslogtreecommitdiff
path: root/Biz/Que
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2021-08-04 11:09:35 -0400
committerBen Sima <ben@bsima.me>2021-11-26 13:47:37 -0500
commit9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch)
treec1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Biz/Que
parent0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff)
Reimplement Que with Servant
Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Biz/Que')
-rwxr-xr-xBiz/Que/Client.py93
-rw-r--r--Biz/Que/Host.hs311
-rw-r--r--Biz/Que/Site.hs6
3 files changed, 210 insertions, 200 deletions
diff --git a/Biz/Que/Client.py b/Biz/Que/Client.py
index 1063eb8..58877bf 100755
--- a/Biz/Que/Client.py
+++ b/Biz/Que/Client.py
@@ -11,11 +11,15 @@ import logging
import os
import subprocess
import sys
+import textwrap
import time
import urllib.parse
import urllib.request as request
-MAX_TIMEOUT = 99999999 # basically never timeout
+MAX_TIMEOUT = 9999999
+RETRIES = 10
+DELAY = 3
+BACKOFF = 1
def auth(args):
@@ -33,8 +37,8 @@ def auth(args):
def autodecode(bytestring):
- """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If
- no decoding is available, just return the raw bytes.
+ """Attempt to decode bytes into common codecs, preferably utf-8. If no
+ decoding is available, just return the raw bytes.
For all available codecs, see:
<https://docs.python.org/3/library/codecs.html#standard-encodings>
@@ -50,7 +54,7 @@ def autodecode(bytestring):
return bytestring
-def retry(exception, tries=4, delay=3, backoff=2):
+def retry(exception, tries=RETRIES, delay=DELAY, backoff=BACKOFF):
"Decorator for retrying an action."
def decorator(func):
@@ -73,20 +77,23 @@ def retry(exception, tries=4, delay=3, backoff=2):
return decorator
+@retry(urllib.error.URLError)
+@retry(http.client.IncompleteRead)
+@retry(http.client.RemoteDisconnected)
def send(args):
"Send a message to the que."
logging.debug("send")
key = auth(args)
data = args.infile
req = request.Request(f"{args.host}/{args.target}")
- req.add_header("User-AgenT", "Que/Client")
+ req.add_header("User-Agent", "Que/Client")
+ req.add_header("Content-Type", "text/plain;charset=utf-8")
if key:
req.add_header("Authorization", key)
if args.serve:
logging.debug("serve")
while not time.sleep(1):
request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
-
else:
request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
@@ -96,75 +103,89 @@ def then(args, msg):
if args.then:
logging.debug("then")
subprocess.run(
- args.then.format(msg=msg, que=args.target), check=False, shell=True,
+ args.then.format(msg=msg, que=args.target),
+ check=False,
+ shell=True,
)
-@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1)
-@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2)
+@retry(urllib.error.URLError)
+@retry(http.client.IncompleteRead)
+@retry(http.client.RemoteDisconnected)
def recv(args):
"Receive a message from the que."
logging.debug("recv on: %s", args.target)
- params = urllib.parse.urlencode({"poll": args.poll})
- req = request.Request(f"{args.host}/{args.target}?{params}")
+ if args.poll:
+ req = request.Request(f"{args.host}/{args.target}/stream")
+ else:
+ req = request.Request(f"{args.host}/{args.target}")
req.add_header("User-Agent", "Que/Client")
key = auth(args)
if key:
req.add_header("Authorization", key)
with request.urlopen(req) as _req:
if args.poll:
- logging.debug("poll")
+ logging.debug("polling")
while not time.sleep(1):
- logging.debug("reading")
- msg = autodecode(_req.readline())
- logging.debug("read")
- print(msg, end="")
- then(args, msg)
+ reply =_req.readline()
+ if reply:
+ msg = autodecode(reply)
+ logging.debug("read")
+ print(msg, end="")
+ then(args, msg)
+ else:
+ continue
else:
- msg = autodecode(_req.read())
+ msg = autodecode(_req.readline())
print(msg)
then(args, msg)
def get_args():
"Command line parser"
- cli = argparse.ArgumentParser(description=__doc__)
+ cli = argparse.ArgumentParser(
+ description=__doc__,
+ epilog=textwrap.dedent(
+ f"""Requests will retry up to {RETRIES} times, with {DELAY} seconds
+ between attempts."""
+ ),
+ )
cli.add_argument("--debug", action="store_true", help="log to stderr")
cli.add_argument(
"--host", default="http://que.run", help="where que-server is running"
)
cli.add_argument(
- "--poll", default=False, action="store_true", help="stream data from the que"
+ "--poll",
+ default=False,
+ action="store_true",
+ help=textwrap.dedent(
+ """keep the connection open to stream data from the que. without
+ this flag, the program will exit after receiving a message"""
+ ),
)
cli.add_argument(
"--then",
- help=" ".join(
- [
- "when polling, run this shell command after each response,",
- "presumably for side effects,"
- r"replacing '{que}' with the target and '{msg}' with the body of the response",
- ]
+ help=textwrap.dedent(
+ """when polling, run this shell command after each response,
+ presumably for side effects, replacing '{que}' with the target and
+ '{msg}' with the body of the response"""
),
)
cli.add_argument(
"--serve",
default=False,
action="store_true",
- help=" ".join(
- [
- "when posting to the que, do so continuously in a loop.",
- "this can be used for serving a webpage or other file continuously",
- ]
+ help=textwrap.dedent(
+ """when posting to the que, do so continuously in a loop. this can
+ be used for serving a webpage or other file continuously"""
),
)
- cli.add_argument(
- "target", help="namespace and path of the que, like 'ns/path/subpath'"
- )
+ cli.add_argument("target", help="namespace and path of the que, like 'ns/path'")
cli.add_argument(
"infile",
nargs="?",
type=argparse.FileType("rb"),
- help="data to put on the que. Use '-' for stdin, otherwise should be a readable file",
+ help="data to put on the que. use '-' for stdin, otherwise should be a readable file",
)
return cli.parse_args()
@@ -173,7 +194,7 @@ if __name__ == "__main__":
ARGV = get_args()
if ARGV.debug:
logging.basicConfig(
- format="%(asctime)s %(message)s",
+ format="%(asctime)s: %(levelname)s: %(message)s",
level=logging.DEBUG,
datefmt="%Y.%m.%d..%H.%M.%S",
)
diff --git a/Biz/Que/Host.hs b/Biz/Que/Host.hs
index 40ee1a5..702827e 100644
--- a/Biz/Que/Host.hs
+++ b/Biz/Que/Host.hs
@@ -1,8 +1,15 @@
+{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE NoImplicitPrelude #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | Interprocess communication
--
@@ -20,29 +27,22 @@ where
import Alpha hiding (gets, modify, poll)
import qualified Biz.Cli as Cli
+import qualified Biz.Log as Log
import Biz.Test ((@=?))
import qualified Biz.Test as Test
import qualified Control.Concurrent.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
-import Control.Monad.Reader (MonadTrans)
-import qualified Data.ByteString.Builder.Extra as Builder
-import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Lazy (HashMap)
import qualified Data.HashMap.Lazy as HashMap
-import qualified Data.Text.Encoding as Encoding
-import qualified Data.Text.Lazy as Text.Lazy
-import qualified Network.HTTP.Types.Status as Http
-import qualified Network.Wai as Wai
+import Network.HTTP.Media ((//), (/:))
import qualified Network.Wai.Handler.Warp as Warp
-import Network.Wai.Middleware.RequestLogger
- ( logStdout,
- )
+import Servant
+import Servant.API.Generic ((:-))
+-- import qualified Servant.Auth.Server as Auth
+import Servant.Server.Generic (AsServerT, genericServeT)
+import qualified Servant.Types.SourceT as Source
import qualified System.Envy as Envy
-import qualified Web.Scotty.Trans as Scotty
-import qualified Prelude
-
-{-# ANN module ("HLint: ignore Reduce duplication" :: Prelude.String) #-}
main :: IO ()
main = Cli.main <| Cli.Plan help move test pure
@@ -51,15 +51,17 @@ move :: Cli.Arguments -> IO ()
move _ = Exception.bracket startup shutdown <| uncurry Warp.run
where
startup =
- Envy.decodeWithDefaults Envy.defConfig +> \c -> do
- sync <- STM.newTVarIO initialAppState
- let runActionToIO m = runReaderT (runApp m) sync
- waiapp <- Scotty.scottyAppT runActionToIO <| routes c
- putText "*"
- putText "que"
- putText <| "port: " <> (show <| quePort c)
- putText <| "skey: " <> (Text.Lazy.toStrict <| queSkey c)
- return (quePort c, waiapp)
+ Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do
+ initialState <- atomically <| STM.newTVar mempty
+ -- natural transformation
+ let nt :: AppState -> App a -> Servant.Handler a
+ nt s x = runReaderT x s
+ let app :: AppState -> Application
+ app s = genericServeT (nt s) (paths cfg)
+ Log.info ["boot", "que"] >> Log.br
+ Log.info ["boot", "port", show <| quePort] >> Log.br
+ Log.info ["boot", "skey", queSkey] >> Log.br
+ pure (quePort, app initialState)
shutdown :: a -> IO a
shutdown = pure <. identity
@@ -76,167 +78,149 @@ Usage:
test :: Test.Tree
test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)]
-newtype App a = App
- { runApp :: ReaderT (STM.TVar AppState) IO a
- }
- deriving
- ( Applicative,
- Functor,
- Monad,
- MonadIO,
- MonadReader
- (STM.TVar AppState)
- )
-
-newtype AppState = AppState
- { ques :: HashMap Namespace Quebase
- }
+type App = ReaderT AppState Servant.Handler
-initialAppState :: AppState
-initialAppState = AppState {ques = mempty}
+type Ques = HashMap Namespace Quebase
+
+type AppState = STM.TVar Ques
data Config = Config
{ -- | QUE_PORT
quePort :: Warp.Port,
-- | QUE_SKEY
- queSkey :: Text.Lazy.Text
+ queSkey :: Text
}
deriving (Generic, Show)
instance Envy.DefConfig Config where
- defConfig = Config 3000 "admin-key"
+ defConfig = Config 3001 "admin-key"
instance Envy.FromEnv Config
-routes :: Config -> Scotty.ScottyT Text.Lazy.Text App ()
-routes cfg = do
- Scotty.middleware logStdout
- let quepath = "^\\/([[:alnum:]_-]+)\\/([[:alnum:]._/-]*)$"
- let namespace = "^\\/([[:alnum:]_-]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path'
-
- -- GET /index.html
- Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index"
- Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index"
- -- GET /_/dash
- Scotty.get (Scotty.literal "/_/dash") <| do
- authkey <- fromMaybe "" </ Scotty.header "Authorization"
- if authkey == (Text.Lazy.strip <| queSkey cfg)
- then do
- d <- app <| gets ques
- Scotty.json d
- else do
- Scotty.status Http.methodNotAllowed405
- Scotty.text "not allowed"
- -- Namespace management
- Scotty.matchAny (Scotty.regex namespace) <| do
- Scotty.status Http.notImplemented501
- Scotty.text "namespace management coming soon"
- -- GET que
- --
- -- Receive a value from a que. Blocks until a value is received,
- -- then returns. If 'poll=true', then stream data from the Que to the
- -- client.
- Scotty.get (Scotty.regex quepath) <| do
- (ns, qp) <- extract
- guardNs ns ["pub", "_"]
- app <. modify <| upsertNamespace ns
- q <- app <| que ns qp
- poll <- Scotty.param "poll" !: (pure <. const False)
- if poll
- then Scotty.stream <| streamQue q
- else do
- r <- liftIO <| Go.read q
- Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
- -- POST que
- --
- -- Put a value on a que. Returns immediately.
- Scotty.post (Scotty.regex quepath) <| do
- authkey <- fromMaybe "" </ Scotty.header "Authorization"
- (ns, qp) <- extract
- -- Only allow my IP or localhost to publish to '_' namespace
- when
- ("_" == ns && authkey /= (Text.Lazy.strip <| queSkey cfg))
- ( Scotty.status Http.methodNotAllowed405
- >> Scotty.text "not allowed: _ is a reserved namespace"
- >> Scotty.finish
- )
- guardNs ns ["pub", "_"]
- -- passed all auth checks
- app <. modify <| upsertNamespace ns
- q <- app <| que ns qp
- qdata <- Scotty.body
- _ <- liftIO <| Go.write q <| BSL.toStrict qdata
- return ()
+-- | A simple HTML type. This recognizes "content-type: text/html" but doesn't
+-- do any conversion, rendering, or sanitization like the
+-- 'Servant.HTML.Lucid.HTML' type would do.
+data HTML deriving (Typeable)
+
+instance Accept HTML where
+ contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"]
+
+instance MimeRender HTML ByteString where
+ mimeRender _ x = str x
+
+instance MimeUnrender HTML Text where
+ mimeUnrender _ bs = Right <| str bs
+
+instance MimeUnrender OctetStream Text where
+ mimeUnrender _ bs = Right <| str bs
+
+instance MimeRender PlainText ByteString where
+ mimeRender _ bs = str bs
+
+instance MimeUnrender PlainText ByteString where
+ mimeUnrender _ bs = Right <| str bs
+
+data Paths path = Paths
+ { home ::
+ path
+ :- Get '[JSON] NoContent,
+ dash ::
+ path
+ :- "_"
+ :> "dash"
+ :> Get '[JSON] Ques,
+ getQue ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quename" Text
+ :> Get '[PlainText, HTML, OctetStream] Message,
+ getStream ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quename" Text
+ :> "stream"
+ :> StreamGet NoFraming OctetStream (SourceIO Message),
+ putQue ::
+ path
+ :- Capture "ns" Text
+ :> Capture "quepath" Text
+ :> ReqBody '[PlainText, HTML, OctetStream] Text
+ :> Post '[PlainText, HTML, OctetStream] NoContent
+ }
+ deriving (Generic)
+
+paths :: Config -> Paths (AsServerT App)
+paths _ =
+ -- TODO revive authkey stuff
+ -- - read Authorization header, compare with queSkey
+ -- - Only allow my IP or localhost to publish to '_' namespace
+ Paths
+ { home =
+ throwError <| err301 {errHeaders = [("Location", "/_/index")]},
+ dash = gets,
+ getQue = \ns qn -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qn
+ Go.mult q
+ |> liftIO
+ +> Go.tap
+ |> liftIO,
+ getStream = \ns qn -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qn
+ Go.mult q
+ |> liftIO
+ +> Go.tap
+ |> Source.fromAction (const False) -- peek chan instead of False?
+ |> pure,
+ putQue = \ns qp body -> do
+ guardNs ns ["pub", "_"]
+ modify <| upsertNamespace ns
+ q <- que ns qp
+ body
+ |> str
+ |> Go.write q
+ >> Go.read q -- flush the que, otherwise msgs never clear
+ |> liftIO
+ -- TODO: detect number of readers, respond with "sent to N readers" or
+ -- "no readers, msg lost"
+ >> pure NoContent
+ }
-- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist`
-- list, return a 405 error.
-guardNs :: Text.Lazy.Text -> [Text.Lazy.Text] -> Scotty.ActionT Text.Lazy.Text App ()
+guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a ()
guardNs ns whitelist =
when (not <| ns `elem` whitelist) <| do
- Scotty.status Http.methodNotAllowed405
- Scotty.text
- <| "not allowed: use 'pub' namespace or signup to protect '"
- <> ns
- <> "' at https://que.run"
- Scotty.finish
-
--- | recover from a scotty-thrown exception.
-(!:) ::
- -- | action that might throw
- Scotty.ActionT Text.Lazy.Text App a ->
- -- | a function providing a default response instead
- (Text.Lazy.Text -> Scotty.ActionT Text.Lazy.Text App a) ->
- Scotty.ActionT Text.Lazy.Text App a
-(!:) = Scotty.rescue
-
--- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
-streamQue :: Que -> Wai.StreamingBody
-streamQue q write _ = loop q
+ throwError <| err405 {errBody = str msg}
where
- loop c =
- Go.read c
- +> (write <. Builder.byteStringInsert)
- >> loop c
+ msg =
+ "not allowed: use 'pub' namespace or signup to protect '"
+ <> ns
+ <> "' at https://que.run"
-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)
--- | Inserts the namespace in 'AppState' if it doesn't exist.
-upsertNamespace :: Namespace -> AppState -> AppState
+-- | Inserts the namespace in 'Ques' if it doesn't exist.
+upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase
upsertNamespace ns as =
- if HashMap.member ns (ques as)
+ if HashMap.member ns as
then as
- else as {ques = HashMap.insert ns mempty (ques as)}
+ else HashMap.insert ns mempty as
-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
-insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
-insertQue ns qp q as = as {ques = newQues}
+insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase
+insertQue ns qp q hm = newQues
where
- newQues = HashMap.insert ns newQbase (ques as)
- newQbase = HashMap.insert qp q <| grab ns <| ques as
-
-extract :: Scotty.ActionT Text.Lazy.Text App (Namespace, Quepath)
-extract = do
- ns <- Scotty.param "1"
- path <- Scotty.param "2"
- return (ns, path)
-
--- | A synonym for 'lift' in order to be explicit about when we are
--- operating at the 'App' layer.
-app :: MonadTrans t => App a -> t App a
-app = lift
-
--- | Get something from the app state
-gets :: (AppState -> b) -> App b
-gets f = ask +> liftIO <. STM.readTVarIO +> return </ f
-
--- | Apply a function to the app state
-modify :: (AppState -> AppState) -> App ()
-modify f = ask +> liftIO <. atomically <. flip STM.modifyTVar' f
+ newQues = HashMap.insert ns newQbase hm
+ newQbase = HashMap.insert qp q <| grab ns hm
-- | housing for a set of que paths
-type Namespace = Text.Lazy.Text
+type Namespace = Text
-- | a que is just a channel of bytes
type Que = Go.Channel Message
@@ -250,15 +234,22 @@ type Message = ByteString
-- | a collection of ques
type Quebase = HashMap Quepath Que
+-- | Get app state
+gets :: App Ques
+gets = ask +> STM.readTVarIO .> liftIO +> pure
+
+-- | Apply a function to the app state
+modify :: (Ques -> Ques) -> App ()
+modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO
+
-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
- _ques <- gets ques
- let qbase = grab ns _ques
- queExists = HashMap.member qp qbase
- if queExists
- then return <| grab qp qbase
+ ques <- gets
+ let qbase = grab ns ques
+ if HashMap.member qp qbase
+ then pure <| grab qp qbase
else do
- c <- liftIO <| Go.chan 1
+ c <- liftIO <| Go.chan 5
modify (insertQue ns qp c)
- gets ques /> grab ns /> grab qp
+ gets /> grab ns /> grab qp
diff --git a/Biz/Que/Site.hs b/Biz/Que/Site.hs
index 06b86c8..e027717 100644
--- a/Biz/Que/Site.hs
+++ b/Biz/Que/Site.hs
@@ -20,12 +20,9 @@ import qualified Control.Concurrent.Async as Async
import qualified Data.ByteString.Char8 as BS
import qualified Data.Ini.Config as Config
import qualified Data.Text as Text
-import Data.Text.Encoding (encodeUtf8)
import qualified Data.Text.IO as Text
import Network.HTTP.Req
import qualified System.Directory as Directory
-import System.Environment as Environment
-import qualified System.Exit as Exit
import System.FilePath ((</>))
import qualified System.Process as Process
@@ -138,7 +135,8 @@ serve Nothing p _ _ = panic <| "no auth key provided for ns: " <> p
serve (Just key) ns path content =
runReq defaultHttpConfig <| do
let options =
- header "Authorization" (encodeUtf8 key) <> responseTimeout maxBound
+ header "Content-type" "text/html;charset=utf-8"
+ -- header "Authorization" (encodeUtf8 key) <> responseTimeout maxBound
_ <-
req
POST