From 9b1df01fd2cf3ecf41fc68b94051db665821c774 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Wed, 4 Aug 2021 11:09:35 -0400 Subject: Reimplement Que with Servant Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway. --- Biz/Que/Client.py | 93 +++++++++------- Biz/Que/Host.hs | 311 ++++++++++++++++++++++++++---------------------------- Biz/Que/Site.hs | 6 +- 3 files changed, 210 insertions(+), 200 deletions(-) (limited to 'Biz/Que') diff --git a/Biz/Que/Client.py b/Biz/Que/Client.py index 1063eb8..58877bf 100755 --- a/Biz/Que/Client.py +++ b/Biz/Que/Client.py @@ -11,11 +11,15 @@ import logging import os import subprocess import sys +import textwrap import time import urllib.parse import urllib.request as request -MAX_TIMEOUT = 99999999 # basically never timeout +MAX_TIMEOUT = 9999999 +RETRIES = 10 +DELAY = 3 +BACKOFF = 1 def auth(args): @@ -33,8 +37,8 @@ def auth(args): def autodecode(bytestring): - """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If - no decoding is available, just return the raw bytes. + """Attempt to decode bytes into common codecs, preferably utf-8. If no + decoding is available, just return the raw bytes. For all available codecs, see: @@ -50,7 +54,7 @@ def autodecode(bytestring): return bytestring -def retry(exception, tries=4, delay=3, backoff=2): +def retry(exception, tries=RETRIES, delay=DELAY, backoff=BACKOFF): "Decorator for retrying an action." def decorator(func): @@ -73,20 +77,23 @@ def retry(exception, tries=4, delay=3, backoff=2): return decorator +@retry(urllib.error.URLError) +@retry(http.client.IncompleteRead) +@retry(http.client.RemoteDisconnected) def send(args): "Send a message to the que." logging.debug("send") key = auth(args) data = args.infile req = request.Request(f"{args.host}/{args.target}") - req.add_header("User-AgenT", "Que/Client") + req.add_header("User-Agent", "Que/Client") + req.add_header("Content-Type", "text/plain;charset=utf-8") if key: req.add_header("Authorization", key) if args.serve: logging.debug("serve") while not time.sleep(1): request.urlopen(req, data=data, timeout=MAX_TIMEOUT) - else: request.urlopen(req, data=data, timeout=MAX_TIMEOUT) @@ -96,75 +103,89 @@ def then(args, msg): if args.then: logging.debug("then") subprocess.run( - args.then.format(msg=msg, que=args.target), check=False, shell=True, + args.then.format(msg=msg, que=args.target), + check=False, + shell=True, ) -@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1) -@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2) +@retry(urllib.error.URLError) +@retry(http.client.IncompleteRead) +@retry(http.client.RemoteDisconnected) def recv(args): "Receive a message from the que." logging.debug("recv on: %s", args.target) - params = urllib.parse.urlencode({"poll": args.poll}) - req = request.Request(f"{args.host}/{args.target}?{params}") + if args.poll: + req = request.Request(f"{args.host}/{args.target}/stream") + else: + req = request.Request(f"{args.host}/{args.target}") req.add_header("User-Agent", "Que/Client") key = auth(args) if key: req.add_header("Authorization", key) with request.urlopen(req) as _req: if args.poll: - logging.debug("poll") + logging.debug("polling") while not time.sleep(1): - logging.debug("reading") - msg = autodecode(_req.readline()) - logging.debug("read") - print(msg, end="") - then(args, msg) + reply =_req.readline() + if reply: + msg = autodecode(reply) + logging.debug("read") + print(msg, end="") + then(args, msg) + else: + continue else: - msg = autodecode(_req.read()) + msg = autodecode(_req.readline()) print(msg) then(args, msg) def get_args(): "Command line parser" - cli = argparse.ArgumentParser(description=__doc__) + cli = argparse.ArgumentParser( + description=__doc__, + epilog=textwrap.dedent( + f"""Requests will retry up to {RETRIES} times, with {DELAY} seconds + between attempts.""" + ), + ) cli.add_argument("--debug", action="store_true", help="log to stderr") cli.add_argument( "--host", default="http://que.run", help="where que-server is running" ) cli.add_argument( - "--poll", default=False, action="store_true", help="stream data from the que" + "--poll", + default=False, + action="store_true", + help=textwrap.dedent( + """keep the connection open to stream data from the que. without + this flag, the program will exit after receiving a message""" + ), ) cli.add_argument( "--then", - help=" ".join( - [ - "when polling, run this shell command after each response,", - "presumably for side effects," - r"replacing '{que}' with the target and '{msg}' with the body of the response", - ] + help=textwrap.dedent( + """when polling, run this shell command after each response, + presumably for side effects, replacing '{que}' with the target and + '{msg}' with the body of the response""" ), ) cli.add_argument( "--serve", default=False, action="store_true", - help=" ".join( - [ - "when posting to the que, do so continuously in a loop.", - "this can be used for serving a webpage or other file continuously", - ] + help=textwrap.dedent( + """when posting to the que, do so continuously in a loop. this can + be used for serving a webpage or other file continuously""" ), ) - cli.add_argument( - "target", help="namespace and path of the que, like 'ns/path/subpath'" - ) + cli.add_argument("target", help="namespace and path of the que, like 'ns/path'") cli.add_argument( "infile", nargs="?", type=argparse.FileType("rb"), - help="data to put on the que. Use '-' for stdin, otherwise should be a readable file", + help="data to put on the que. use '-' for stdin, otherwise should be a readable file", ) return cli.parse_args() @@ -173,7 +194,7 @@ if __name__ == "__main__": ARGV = get_args() if ARGV.debug: logging.basicConfig( - format="%(asctime)s %(message)s", + format="%(asctime)s: %(levelname)s: %(message)s", level=logging.DEBUG, datefmt="%Y.%m.%d..%H.%M.%S", ) diff --git a/Biz/Que/Host.hs b/Biz/Que/Host.hs index 40ee1a5..702827e 100644 --- a/Biz/Que/Host.hs +++ b/Biz/Que/Host.hs @@ -1,8 +1,15 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeOperators #-} {-# LANGUAGE NoImplicitPrelude #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} -- | Interprocess communication -- @@ -20,29 +27,22 @@ where import Alpha hiding (gets, modify, poll) import qualified Biz.Cli as Cli +import qualified Biz.Log as Log import Biz.Test ((@=?)) import qualified Biz.Test as Test import qualified Control.Concurrent.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception -import Control.Monad.Reader (MonadTrans) -import qualified Data.ByteString.Builder.Extra as Builder -import qualified Data.ByteString.Lazy as BSL import Data.HashMap.Lazy (HashMap) import qualified Data.HashMap.Lazy as HashMap -import qualified Data.Text.Encoding as Encoding -import qualified Data.Text.Lazy as Text.Lazy -import qualified Network.HTTP.Types.Status as Http -import qualified Network.Wai as Wai +import Network.HTTP.Media ((//), (/:)) import qualified Network.Wai.Handler.Warp as Warp -import Network.Wai.Middleware.RequestLogger - ( logStdout, - ) +import Servant +import Servant.API.Generic ((:-)) +-- import qualified Servant.Auth.Server as Auth +import Servant.Server.Generic (AsServerT, genericServeT) +import qualified Servant.Types.SourceT as Source import qualified System.Envy as Envy -import qualified Web.Scotty.Trans as Scotty -import qualified Prelude - -{-# ANN module ("HLint: ignore Reduce duplication" :: Prelude.String) #-} main :: IO () main = Cli.main <| Cli.Plan help move test pure @@ -51,15 +51,17 @@ move :: Cli.Arguments -> IO () move _ = Exception.bracket startup shutdown <| uncurry Warp.run where startup = - Envy.decodeWithDefaults Envy.defConfig +> \c -> do - sync <- STM.newTVarIO initialAppState - let runActionToIO m = runReaderT (runApp m) sync - waiapp <- Scotty.scottyAppT runActionToIO <| routes c - putText "*" - putText "que" - putText <| "port: " <> (show <| quePort c) - putText <| "skey: " <> (Text.Lazy.toStrict <| queSkey c) - return (quePort c, waiapp) + Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do + initialState <- atomically <| STM.newTVar mempty + -- natural transformation + let nt :: AppState -> App a -> Servant.Handler a + nt s x = runReaderT x s + let app :: AppState -> Application + app s = genericServeT (nt s) (paths cfg) + Log.info ["boot", "que"] >> Log.br + Log.info ["boot", "port", show <| quePort] >> Log.br + Log.info ["boot", "skey", queSkey] >> Log.br + pure (quePort, app initialState) shutdown :: a -> IO a shutdown = pure <. identity @@ -76,167 +78,149 @@ Usage: test :: Test.Tree test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)] -newtype App a = App - { runApp :: ReaderT (STM.TVar AppState) IO a - } - deriving - ( Applicative, - Functor, - Monad, - MonadIO, - MonadReader - (STM.TVar AppState) - ) - -newtype AppState = AppState - { ques :: HashMap Namespace Quebase - } +type App = ReaderT AppState Servant.Handler -initialAppState :: AppState -initialAppState = AppState {ques = mempty} +type Ques = HashMap Namespace Quebase + +type AppState = STM.TVar Ques data Config = Config { -- | QUE_PORT quePort :: Warp.Port, -- | QUE_SKEY - queSkey :: Text.Lazy.Text + queSkey :: Text } deriving (Generic, Show) instance Envy.DefConfig Config where - defConfig = Config 3000 "admin-key" + defConfig = Config 3001 "admin-key" instance Envy.FromEnv Config -routes :: Config -> Scotty.ScottyT Text.Lazy.Text App () -routes cfg = do - Scotty.middleware logStdout - let quepath = "^\\/([[:alnum:]_-]+)\\/([[:alnum:]._/-]*)$" - let namespace = "^\\/([[:alnum:]_-]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path' - - -- GET /index.html - Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index" - Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index" - -- GET /_/dash - Scotty.get (Scotty.literal "/_/dash") <| do - authkey <- fromMaybe "" > Scotty.text "not allowed: _ is a reserved namespace" - >> Scotty.finish - ) - guardNs ns ["pub", "_"] - -- passed all auth checks - app <. modify <| upsertNamespace ns - q <- app <| que ns qp - qdata <- Scotty.body - _ <- liftIO <| Go.write q <| BSL.toStrict qdata - return () +-- | A simple HTML type. This recognizes "content-type: text/html" but doesn't +-- do any conversion, rendering, or sanitization like the +-- 'Servant.HTML.Lucid.HTML' type would do. +data HTML deriving (Typeable) + +instance Accept HTML where + contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"] + +instance MimeRender HTML ByteString where + mimeRender _ x = str x + +instance MimeUnrender HTML Text where + mimeUnrender _ bs = Right <| str bs + +instance MimeUnrender OctetStream Text where + mimeUnrender _ bs = Right <| str bs + +instance MimeRender PlainText ByteString where + mimeRender _ bs = str bs + +instance MimeUnrender PlainText ByteString where + mimeUnrender _ bs = Right <| str bs + +data Paths path = Paths + { home :: + path + :- Get '[JSON] NoContent, + dash :: + path + :- "_" + :> "dash" + :> Get '[JSON] Ques, + getQue :: + path + :- Capture "ns" Text + :> Capture "quename" Text + :> Get '[PlainText, HTML, OctetStream] Message, + getStream :: + path + :- Capture "ns" Text + :> Capture "quename" Text + :> "stream" + :> StreamGet NoFraming OctetStream (SourceIO Message), + putQue :: + path + :- Capture "ns" Text + :> Capture "quepath" Text + :> ReqBody '[PlainText, HTML, OctetStream] Text + :> Post '[PlainText, HTML, OctetStream] NoContent + } + deriving (Generic) + +paths :: Config -> Paths (AsServerT App) +paths _ = + -- TODO revive authkey stuff + -- - read Authorization header, compare with queSkey + -- - Only allow my IP or localhost to publish to '_' namespace + Paths + { home = + throwError <| err301 {errHeaders = [("Location", "/_/index")]}, + dash = gets, + getQue = \ns qn -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qn + Go.mult q + |> liftIO + +> Go.tap + |> liftIO, + getStream = \ns qn -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qn + Go.mult q + |> liftIO + +> Go.tap + |> Source.fromAction (const False) -- peek chan instead of False? + |> pure, + putQue = \ns qp body -> do + guardNs ns ["pub", "_"] + modify <| upsertNamespace ns + q <- que ns qp + body + |> str + |> Go.write q + >> Go.read q -- flush the que, otherwise msgs never clear + |> liftIO + -- TODO: detect number of readers, respond with "sent to N readers" or + -- "no readers, msg lost" + >> pure NoContent + } -- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist` -- list, return a 405 error. -guardNs :: Text.Lazy.Text -> [Text.Lazy.Text] -> Scotty.ActionT Text.Lazy.Text App () +guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a () guardNs ns whitelist = when (not <| ns `elem` whitelist) <| do - Scotty.status Http.methodNotAllowed405 - Scotty.text - <| "not allowed: use 'pub' namespace or signup to protect '" - <> ns - <> "' at https://que.run" - Scotty.finish - --- | recover from a scotty-thrown exception. -(!:) :: - -- | action that might throw - Scotty.ActionT Text.Lazy.Text App a -> - -- | a function providing a default response instead - (Text.Lazy.Text -> Scotty.ActionT Text.Lazy.Text App a) -> - Scotty.ActionT Text.Lazy.Text App a -(!:) = Scotty.rescue - --- | Forever write the data from 'Que' to 'Wai.StreamingBody'. -streamQue :: Que -> Wai.StreamingBody -streamQue q write _ = loop q + throwError <| err405 {errBody = str msg} where - loop c = - Go.read c - +> (write <. Builder.byteStringInsert) - >> loop c + msg = + "not allowed: use 'pub' namespace or signup to protect '" + <> ns + <> "' at https://que.run" -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) --- | Inserts the namespace in 'AppState' if it doesn't exist. -upsertNamespace :: Namespace -> AppState -> AppState +-- | Inserts the namespace in 'Ques' if it doesn't exist. +upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase upsertNamespace ns as = - if HashMap.member ns (ques as) + if HashMap.member ns as then as - else as {ques = HashMap.insert ns mempty (ques as)} + else HashMap.insert ns mempty as -- | Inserts the que at the proper 'Namespace' and 'Quepath'. -insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState -insertQue ns qp q as = as {ques = newQues} +insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase +insertQue ns qp q hm = newQues where - newQues = HashMap.insert ns newQbase (ques as) - newQbase = HashMap.insert qp q <| grab ns <| ques as - -extract :: Scotty.ActionT Text.Lazy.Text App (Namespace, Quepath) -extract = do - ns <- Scotty.param "1" - path <- Scotty.param "2" - return (ns, path) - --- | A synonym for 'lift' in order to be explicit about when we are --- operating at the 'App' layer. -app :: MonadTrans t => App a -> t App a -app = lift - --- | Get something from the app state -gets :: (AppState -> b) -> App b -gets f = ask +> liftIO <. STM.readTVarIO +> return AppState) -> App () -modify f = ask +> liftIO <. atomically <. flip STM.modifyTVar' f + newQues = HashMap.insert ns newQbase hm + newQbase = HashMap.insert qp q <| grab ns hm -- | housing for a set of que paths -type Namespace = Text.Lazy.Text +type Namespace = Text -- | a que is just a channel of bytes type Que = Go.Channel Message @@ -250,15 +234,22 @@ type Message = ByteString -- | a collection of ques type Quebase = HashMap Quepath Que +-- | Get app state +gets :: App Ques +gets = ask +> STM.readTVarIO .> liftIO +> pure + +-- | Apply a function to the app state +modify :: (Ques -> Ques) -> App () +modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO + -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do - _ques <- gets ques - let qbase = grab ns _ques - queExists = HashMap.member qp qbase - if queExists - then return <| grab qp qbase + ques <- gets + let qbase = grab ns ques + if HashMap.member qp qbase + then pure <| grab qp qbase else do - c <- liftIO <| Go.chan 1 + c <- liftIO <| Go.chan 5 modify (insertQue ns qp c) - gets ques /> grab ns /> grab qp + gets /> grab ns /> grab qp diff --git a/Biz/Que/Site.hs b/Biz/Que/Site.hs index 06b86c8..e027717 100644 --- a/Biz/Que/Site.hs +++ b/Biz/Que/Site.hs @@ -20,12 +20,9 @@ import qualified Control.Concurrent.Async as Async import qualified Data.ByteString.Char8 as BS import qualified Data.Ini.Config as Config import qualified Data.Text as Text -import Data.Text.Encoding (encodeUtf8) import qualified Data.Text.IO as Text import Network.HTTP.Req import qualified System.Directory as Directory -import System.Environment as Environment -import qualified System.Exit as Exit import System.FilePath (()) import qualified System.Process as Process @@ -138,7 +135,8 @@ serve Nothing p _ _ = panic <| "no auth key provided for ns: " <> p serve (Just key) ns path content = runReq defaultHttpConfig <| do let options = - header "Authorization" (encodeUtf8 key) <> responseTimeout maxBound + header "Content-type" "text/html;charset=utf-8" + -- header "Authorization" (encodeUtf8 key) <> responseTimeout maxBound _ <- req POST -- cgit v1.2.3