summaryrefslogtreecommitdiff
path: root/Biz/Que/Client.py
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2021-08-04 11:09:35 -0400
committerBen Sima <ben@bsima.me>2021-11-26 13:47:37 -0500
commit9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch)
treec1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Biz/Que/Client.py
parent0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff)
Reimplement Que with Servant
Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Biz/Que/Client.py')
-rwxr-xr-xBiz/Que/Client.py93
1 files changed, 57 insertions, 36 deletions
diff --git a/Biz/Que/Client.py b/Biz/Que/Client.py
index 1063eb8..58877bf 100755
--- a/Biz/Que/Client.py
+++ b/Biz/Que/Client.py
@@ -11,11 +11,15 @@ import logging
import os
import subprocess
import sys
+import textwrap
import time
import urllib.parse
import urllib.request as request
-MAX_TIMEOUT = 99999999 # basically never timeout
+MAX_TIMEOUT = 9999999
+RETRIES = 10
+DELAY = 3
+BACKOFF = 1
def auth(args):
@@ -33,8 +37,8 @@ def auth(args):
def autodecode(bytestring):
- """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If
- no decoding is available, just return the raw bytes.
+ """Attempt to decode bytes into common codecs, preferably utf-8. If no
+ decoding is available, just return the raw bytes.
For all available codecs, see:
<https://docs.python.org/3/library/codecs.html#standard-encodings>
@@ -50,7 +54,7 @@ def autodecode(bytestring):
return bytestring
-def retry(exception, tries=4, delay=3, backoff=2):
+def retry(exception, tries=RETRIES, delay=DELAY, backoff=BACKOFF):
"Decorator for retrying an action."
def decorator(func):
@@ -73,20 +77,23 @@ def retry(exception, tries=4, delay=3, backoff=2):
return decorator
+@retry(urllib.error.URLError)
+@retry(http.client.IncompleteRead)
+@retry(http.client.RemoteDisconnected)
def send(args):
"Send a message to the que."
logging.debug("send")
key = auth(args)
data = args.infile
req = request.Request(f"{args.host}/{args.target}")
- req.add_header("User-AgenT", "Que/Client")
+ req.add_header("User-Agent", "Que/Client")
+ req.add_header("Content-Type", "text/plain;charset=utf-8")
if key:
req.add_header("Authorization", key)
if args.serve:
logging.debug("serve")
while not time.sleep(1):
request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
-
else:
request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
@@ -96,75 +103,89 @@ def then(args, msg):
if args.then:
logging.debug("then")
subprocess.run(
- args.then.format(msg=msg, que=args.target), check=False, shell=True,
+ args.then.format(msg=msg, que=args.target),
+ check=False,
+ shell=True,
)
-@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1)
-@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2)
+@retry(urllib.error.URLError)
+@retry(http.client.IncompleteRead)
+@retry(http.client.RemoteDisconnected)
def recv(args):
"Receive a message from the que."
logging.debug("recv on: %s", args.target)
- params = urllib.parse.urlencode({"poll": args.poll})
- req = request.Request(f"{args.host}/{args.target}?{params}")
+ if args.poll:
+ req = request.Request(f"{args.host}/{args.target}/stream")
+ else:
+ req = request.Request(f"{args.host}/{args.target}")
req.add_header("User-Agent", "Que/Client")
key = auth(args)
if key:
req.add_header("Authorization", key)
with request.urlopen(req) as _req:
if args.poll:
- logging.debug("poll")
+ logging.debug("polling")
while not time.sleep(1):
- logging.debug("reading")
- msg = autodecode(_req.readline())
- logging.debug("read")
- print(msg, end="")
- then(args, msg)
+ reply =_req.readline()
+ if reply:
+ msg = autodecode(reply)
+ logging.debug("read")
+ print(msg, end="")
+ then(args, msg)
+ else:
+ continue
else:
- msg = autodecode(_req.read())
+ msg = autodecode(_req.readline())
print(msg)
then(args, msg)
def get_args():
"Command line parser"
- cli = argparse.ArgumentParser(description=__doc__)
+ cli = argparse.ArgumentParser(
+ description=__doc__,
+ epilog=textwrap.dedent(
+ f"""Requests will retry up to {RETRIES} times, with {DELAY} seconds
+ between attempts."""
+ ),
+ )
cli.add_argument("--debug", action="store_true", help="log to stderr")
cli.add_argument(
"--host", default="http://que.run", help="where que-server is running"
)
cli.add_argument(
- "--poll", default=False, action="store_true", help="stream data from the que"
+ "--poll",
+ default=False,
+ action="store_true",
+ help=textwrap.dedent(
+ """keep the connection open to stream data from the que. without
+ this flag, the program will exit after receiving a message"""
+ ),
)
cli.add_argument(
"--then",
- help=" ".join(
- [
- "when polling, run this shell command after each response,",
- "presumably for side effects,"
- r"replacing '{que}' with the target and '{msg}' with the body of the response",
- ]
+ help=textwrap.dedent(
+ """when polling, run this shell command after each response,
+ presumably for side effects, replacing '{que}' with the target and
+ '{msg}' with the body of the response"""
),
)
cli.add_argument(
"--serve",
default=False,
action="store_true",
- help=" ".join(
- [
- "when posting to the que, do so continuously in a loop.",
- "this can be used for serving a webpage or other file continuously",
- ]
+ help=textwrap.dedent(
+ """when posting to the que, do so continuously in a loop. this can
+ be used for serving a webpage or other file continuously"""
),
)
- cli.add_argument(
- "target", help="namespace and path of the que, like 'ns/path/subpath'"
- )
+ cli.add_argument("target", help="namespace and path of the que, like 'ns/path'")
cli.add_argument(
"infile",
nargs="?",
type=argparse.FileType("rb"),
- help="data to put on the que. Use '-' for stdin, otherwise should be a readable file",
+ help="data to put on the que. use '-' for stdin, otherwise should be a readable file",
)
return cli.parse_args()
@@ -173,7 +194,7 @@ if __name__ == "__main__":
ARGV = get_args()
if ARGV.debug:
logging.basicConfig(
- format="%(asctime)s %(message)s",
+ format="%(asctime)s: %(levelname)s: %(message)s",
level=logging.DEBUG,
datefmt="%Y.%m.%d..%H.%M.%S",
)