summaryrefslogtreecommitdiff
path: root/Que/Client.py
diff options
context:
space:
mode:
Diffstat (limited to 'Que/Client.py')
-rwxr-xr-xQue/Client.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/Que/Client.py b/Que/Client.py
new file mode 100755
index 0000000..1063eb8
--- /dev/null
+++ b/Que/Client.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python3
+"""
+simple client for que.run
+"""
+
+import argparse
+import configparser
+import functools
+import http.client
+import logging
+import os
+import subprocess
+import sys
+import time
+import urllib.parse
+import urllib.request as request
+
+MAX_TIMEOUT = 99999999 # basically never timeout
+
+
+def auth(args):
+ "Returns the auth key for the given ns from ~/.config/que.conf"
+ logging.debug("auth")
+ namespace = args.target.split("/")[0]
+ if namespace == "pub":
+ return None
+ conf_file = os.path.expanduser("~/.config/que.conf")
+ if not os.path.exists(conf_file):
+ sys.exit("you need a ~/.config/que.conf")
+ cfg = configparser.ConfigParser()
+ cfg.read(conf_file)
+ return cfg[namespace]["key"]
+
+
+def autodecode(bytestring):
+ """Attempt to decode bytes `bs` into common codecs, preferably utf-8. If
+ no decoding is available, just return the raw bytes.
+
+ For all available codecs, see:
+ <https://docs.python.org/3/library/codecs.html#standard-encodings>
+
+ """
+ logging.debug("autodecode")
+ codecs = ["utf-8", "ascii"]
+ for codec in codecs:
+ try:
+ return bytestring.decode(codec)
+ except UnicodeDecodeError:
+ pass
+ return bytestring
+
+
+def retry(exception, tries=4, delay=3, backoff=2):
+ "Decorator for retrying an action."
+
+ def decorator(func):
+ @functools.wraps(func)
+ def func_retry(*args, **kwargs):
+ mtries, mdelay = tries, delay
+ while mtries > 1:
+ try:
+ return func(*args, **kwargs)
+ except exception as ex:
+ logging.debug(ex)
+ logging.debug("retrying...")
+ time.sleep(mdelay)
+ mtries -= 1
+ mdelay *= backoff
+ return func(*args, **kwargs)
+
+ return func_retry
+
+ return decorator
+
+
+def send(args):
+ "Send a message to the que."
+ logging.debug("send")
+ key = auth(args)
+ data = args.infile
+ req = request.Request(f"{args.host}/{args.target}")
+ req.add_header("User-AgenT", "Que/Client")
+ if key:
+ req.add_header("Authorization", key)
+ if args.serve:
+ logging.debug("serve")
+ while not time.sleep(1):
+ request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
+
+ else:
+ request.urlopen(req, data=data, timeout=MAX_TIMEOUT)
+
+
+def then(args, msg):
+ "Perform an action when passed `--then`."
+ if args.then:
+ logging.debug("then")
+ subprocess.run(
+ args.then.format(msg=msg, que=args.target), check=False, shell=True,
+ )
+
+
+@retry(http.client.IncompleteRead, tries=10, delay=5, backoff=1)
+@retry(http.client.RemoteDisconnected, tries=10, delay=2, backoff=2)
+def recv(args):
+ "Receive a message from the que."
+ logging.debug("recv on: %s", args.target)
+ params = urllib.parse.urlencode({"poll": args.poll})
+ req = request.Request(f"{args.host}/{args.target}?{params}")
+ req.add_header("User-Agent", "Que/Client")
+ key = auth(args)
+ if key:
+ req.add_header("Authorization", key)
+ with request.urlopen(req) as _req:
+ if args.poll:
+ logging.debug("poll")
+ while not time.sleep(1):
+ logging.debug("reading")
+ msg = autodecode(_req.readline())
+ logging.debug("read")
+ print(msg, end="")
+ then(args, msg)
+ else:
+ msg = autodecode(_req.read())
+ print(msg)
+ then(args, msg)
+
+
+def get_args():
+ "Command line parser"
+ cli = argparse.ArgumentParser(description=__doc__)
+ cli.add_argument("--debug", action="store_true", help="log to stderr")
+ cli.add_argument(
+ "--host", default="http://que.run", help="where que-server is running"
+ )
+ cli.add_argument(
+ "--poll", default=False, action="store_true", help="stream data from the que"
+ )
+ cli.add_argument(
+ "--then",
+ help=" ".join(
+ [
+ "when polling, run this shell command after each response,",
+ "presumably for side effects,"
+ r"replacing '{que}' with the target and '{msg}' with the body of the response",
+ ]
+ ),
+ )
+ cli.add_argument(
+ "--serve",
+ default=False,
+ action="store_true",
+ help=" ".join(
+ [
+ "when posting to the que, do so continuously in a loop.",
+ "this can be used for serving a webpage or other file continuously",
+ ]
+ ),
+ )
+ cli.add_argument(
+ "target", help="namespace and path of the que, like 'ns/path/subpath'"
+ )
+ cli.add_argument(
+ "infile",
+ nargs="?",
+ type=argparse.FileType("rb"),
+ help="data to put on the que. Use '-' for stdin, otherwise should be a readable file",
+ )
+ return cli.parse_args()
+
+
+if __name__ == "__main__":
+ ARGV = get_args()
+ if ARGV.debug:
+ logging.basicConfig(
+ format="%(asctime)s %(message)s",
+ level=logging.DEBUG,
+ datefmt="%Y.%m.%d..%H.%M.%S",
+ )
+ try:
+ if ARGV.infile:
+ send(ARGV)
+ else:
+ recv(ARGV)
+ except KeyboardInterrupt:
+ sys.exit(0)