diff options
Diffstat (limited to 'Biz/Que/Client.py')
-rwxr-xr-x | Biz/Que/Client.py | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/Biz/Que/Client.py b/Biz/Que/Client.py new file mode 100755 index 0000000..1063eb8 --- /dev/null +++ b/Biz/Que/Client.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +simple client for que.run +""" + +import argparse +import configparser +import functools +import http.client +import logging +import os +import subprocess +import sys +import time +import urllib.parse +import urllib.request as request + +MAX_TIMEOUT = 99999999 # basically never timeout + + +def auth(args): + "Returns the auth key for the given ns from ~/.config/que.conf" + logging.debug("auth") + namespace = args.target.split("/")[0] + if namespace == "pub": + return None + conf_file = os.path.expanduser("~/.config/que.conf") + if not os.path.exists(conf_file): + sys.exit("you need a ~/.config/que.conf") + cfg = configparser.ConfigParser() + cfg.read(conf_file) + return cfg[namespace]["key"] + + +def autodecode(bytestring): + """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If + no decoding is available, just return the raw bytes. + + For all available codecs, see: + <https://docs.python.org/3/library/codecs.html#standard-encodings> + + """ + logging.debug("autodecode") + codecs = ["utf-8", "ascii"] + for codec in codecs: + try: + return bytestring.decode(codec) + except UnicodeDecodeError: + pass + return bytestring + + +def retry(exception, tries=4, delay=3, backoff=2): + "Decorator for retrying an action." + + def decorator(func): + @functools.wraps(func) + def func_retry(*args, **kwargs): + mtries, mdelay = tries, delay + while mtries > 1: + try: + return func(*args, **kwargs) + except exception as ex: + logging.debug(ex) + logging.debug("retrying...") + time.sleep(mdelay) + mtries -= 1 + mdelay *= backoff + return func(*args, **kwargs) + + return func_retry + + return decorator + + +def send(args): + "Send a message to the que." + logging.debug("send") + key = auth(args) + data = args.infile + req = request.Request(f"{args.host}/{args.target}") + req.add_header("User-AgenT", "Que/Client") + if key: + req.add_header("Authorization", key) + if args.serve: + logging.debug("serve") + while not time.sleep(1): + request.urlopen(req, data=data, timeout=MAX_TIMEOUT) + + else: + request.urlopen(req, data=data, timeout=MAX_TIMEOUT) + + +def then(args, msg): + "Perform an action when passed `--then`." + if args.then: + logging.debug("then") + subprocess.run( + args.then.format(msg=msg, que=args.target), check=False, shell=True, + ) + + +@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1) +@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2) +def recv(args): + "Receive a message from the que." + logging.debug("recv on: %s", args.target) + params = urllib.parse.urlencode({"poll": args.poll}) + req = request.Request(f"{args.host}/{args.target}?{params}") + req.add_header("User-Agent", "Que/Client") + key = auth(args) + if key: + req.add_header("Authorization", key) + with request.urlopen(req) as _req: + if args.poll: + logging.debug("poll") + while not time.sleep(1): + logging.debug("reading") + msg = autodecode(_req.readline()) + logging.debug("read") + print(msg, end="") + then(args, msg) + else: + msg = autodecode(_req.read()) + print(msg) + then(args, msg) + + +def get_args(): + "Command line parser" + cli = argparse.ArgumentParser(description=__doc__) + cli.add_argument("--debug", action="store_true", help="log to stderr") + cli.add_argument( + "--host", default="http://que.run", help="where que-server is running" + ) + cli.add_argument( + "--poll", default=False, action="store_true", help="stream data from the que" + ) + cli.add_argument( + "--then", + help=" ".join( + [ + "when polling, run this shell command after each response,", + "presumably for side effects," + r"replacing '{que}' with the target and '{msg}' with the body of the response", + ] + ), + ) + cli.add_argument( + "--serve", + default=False, + action="store_true", + help=" ".join( + [ + "when posting to the que, do so continuously in a loop.", + "this can be used for serving a webpage or other file continuously", + ] + ), + ) + cli.add_argument( + "target", help="namespace and path of the que, like 'ns/path/subpath'" + ) + cli.add_argument( + "infile", + nargs="?", + type=argparse.FileType("rb"), + help="data to put on the que. Use '-' for stdin, otherwise should be a readable file", + ) + return cli.parse_args() + + +if __name__ == "__main__": + ARGV = get_args() + if ARGV.debug: + logging.basicConfig( + format="%(asctime)s %(message)s", + level=logging.DEBUG, + datefmt="%Y.%m.%d..%H.%M.%S", + ) + try: + if ARGV.infile: + send(ARGV) + else: + recv(ARGV) + except KeyboardInterrupt: + sys.exit(0) |