diff options
author | Ben Sima <ben@bsima.me> | 2021-08-04 11:09:35 -0400 |
---|---|---|
committer | Ben Sima <ben@bsima.me> | 2021-11-26 13:47:37 -0500 |
commit | 9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch) | |
tree | c1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Biz/Que/Client.py | |
parent | 0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff) |
Reimplement Que with Servant
Still todo: add authentication. But that can wait.
In re-implementing this, I was able to figure out how to get the Go.mult working
properly as well. The problem is that a tap from a mult channel does not remove
the message from the original channel. I'm not sure if that should be a core
feature or not; for now I'm just draining the channel when it's received in the
Que HTTP handler. (Also, this would be a good place to put persistence: have a
background job read from the original channel, and write the msg to disk via
acid-state; this would obviate the need for a flush to nowhere.)
Also, streaming is working now. The problem was that Scotty closes the
connection after it sees a newline in the body, or something, so streaming over
Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Biz/Que/Client.py')
-rwxr-xr-x | Biz/Que/Client.py | 93 |
1 files changed, 57 insertions, 36 deletions
diff --git a/Biz/Que/Client.py b/Biz/Que/Client.py index 1063eb8..58877bf 100755 --- a/Biz/Que/Client.py +++ b/Biz/Que/Client.py @@ -11,11 +11,15 @@ import logging import os import subprocess import sys +import textwrap import time import urllib.parse import urllib.request as request -MAX_TIMEOUT = 99999999 # basically never timeout +MAX_TIMEOUT = 9999999 +RETRIES = 10 +DELAY = 3 +BACKOFF = 1 def auth(args): @@ -33,8 +37,8 @@ def auth(args): def autodecode(bytestring): - """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If - no decoding is available, just return the raw bytes. + """Attempt to decode bytes into common codecs, preferably utf-8. If no + decoding is available, just return the raw bytes. For all available codecs, see: <https://docs.python.org/3/library/codecs.html#standard-encodings> @@ -50,7 +54,7 @@ def autodecode(bytestring): return bytestring -def retry(exception, tries=4, delay=3, backoff=2): +def retry(exception, tries=RETRIES, delay=DELAY, backoff=BACKOFF): "Decorator for retrying an action." def decorator(func): @@ -73,20 +77,23 @@ def retry(exception, tries=4, delay=3, backoff=2): return decorator +@retry(urllib.error.URLError) +@retry(http.client.IncompleteRead) +@retry(http.client.RemoteDisconnected) def send(args): "Send a message to the que." logging.debug("send") key = auth(args) data = args.infile req = request.Request(f"{args.host}/{args.target}") - req.add_header("User-AgenT", "Que/Client") + req.add_header("User-Agent", "Que/Client") + req.add_header("Content-Type", "text/plain;charset=utf-8") if key: req.add_header("Authorization", key) if args.serve: logging.debug("serve") while not time.sleep(1): request.urlopen(req, data=data, timeout=MAX_TIMEOUT) - else: request.urlopen(req, data=data, timeout=MAX_TIMEOUT) @@ -96,75 +103,89 @@ def then(args, msg): if args.then: logging.debug("then") subprocess.run( - args.then.format(msg=msg, que=args.target), check=False, shell=True, + args.then.format(msg=msg, que=args.target), + check=False, + shell=True, ) -@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1) -@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2) +@retry(urllib.error.URLError) +@retry(http.client.IncompleteRead) +@retry(http.client.RemoteDisconnected) def recv(args): "Receive a message from the que." logging.debug("recv on: %s", args.target) - params = urllib.parse.urlencode({"poll": args.poll}) - req = request.Request(f"{args.host}/{args.target}?{params}") + if args.poll: + req = request.Request(f"{args.host}/{args.target}/stream") + else: + req = request.Request(f"{args.host}/{args.target}") req.add_header("User-Agent", "Que/Client") key = auth(args) if key: req.add_header("Authorization", key) with request.urlopen(req) as _req: if args.poll: - logging.debug("poll") + logging.debug("polling") while not time.sleep(1): - logging.debug("reading") - msg = autodecode(_req.readline()) - logging.debug("read") - print(msg, end="") - then(args, msg) + reply =_req.readline() + if reply: + msg = autodecode(reply) + logging.debug("read") + print(msg, end="") + then(args, msg) + else: + continue else: - msg = autodecode(_req.read()) + msg = autodecode(_req.readline()) print(msg) then(args, msg) def get_args(): "Command line parser" - cli = argparse.ArgumentParser(description=__doc__) + cli = argparse.ArgumentParser( + description=__doc__, + epilog=textwrap.dedent( + f"""Requests will retry up to {RETRIES} times, with {DELAY} seconds + between attempts.""" + ), + ) cli.add_argument("--debug", action="store_true", help="log to stderr") cli.add_argument( "--host", default="http://que.run", help="where que-server is running" ) cli.add_argument( - "--poll", default=False, action="store_true", help="stream data from the que" + "--poll", + default=False, + action="store_true", + help=textwrap.dedent( + """keep the connection open to stream data from the que. without + this flag, the program will exit after receiving a message""" + ), ) cli.add_argument( "--then", - help=" ".join( - [ - "when polling, run this shell command after each response,", - "presumably for side effects," - r"replacing '{que}' with the target and '{msg}' with the body of the response", - ] + help=textwrap.dedent( + """when polling, run this shell command after each response, + presumably for side effects, replacing '{que}' with the target and + '{msg}' with the body of the response""" ), ) cli.add_argument( "--serve", default=False, action="store_true", - help=" ".join( - [ - "when posting to the que, do so continuously in a loop.", - "this can be used for serving a webpage or other file continuously", - ] + help=textwrap.dedent( + """when posting to the que, do so continuously in a loop. this can + be used for serving a webpage or other file continuously""" ), ) - cli.add_argument( - "target", help="namespace and path of the que, like 'ns/path/subpath'" - ) + cli.add_argument("target", help="namespace and path of the que, like 'ns/path'") cli.add_argument( "infile", nargs="?", type=argparse.FileType("rb"), - help="data to put on the que. Use '-' for stdin, otherwise should be a readable file", + help="data to put on the que. use '-' for stdin, otherwise should be a readable file", ) return cli.parse_args() @@ -173,7 +194,7 @@ if __name__ == "__main__": ARGV = get_args() if ARGV.debug: logging.basicConfig( - format="%(asctime)s %(message)s", + format="%(asctime)s: %(levelname)s: %(message)s", level=logging.DEBUG, datefmt="%Y.%m.%d..%H.%M.%S", ) |