summaryrefslogtreecommitdiff
path: root/Run/Que
diff options
context:
space:
mode:
Diffstat (limited to 'Run/Que')
-rw-r--r--Run/Que/Server.hs238
-rw-r--r--Run/Que/Website.hs45
-rwxr-xr-xRun/Que/client.py95
-rw-r--r--Run/Que/index.md68
-rw-r--r--Run/Que/style.css4
5 files changed, 450 insertions, 0 deletions
diff --git a/Run/Que/Server.hs b/Run/Que/Server.hs
new file mode 100644
index 0000000..1acbe60
--- /dev/null
+++ b/Run/Que/Server.hs
@@ -0,0 +1,238 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+{-# LANGUAGE GeneralizedNewtypeDeriving #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE LambdaCase #-}
+
+{- | Interprocess communication
+-}
+module Run.Que.Server
+ ( main
+ )
+where
+
+import Com.Simatime.Alpha hiding ( Text
+ , get
+ , gets
+ , modify
+ , poll
+ )
+import qualified Com.Simatime.Go as Go
+import qualified Control.Concurrent.STM as STM
+import qualified Control.Exception as Exception
+import Control.Monad.Reader ( MonadTrans )
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Builder.Extra as Builder
+import qualified Data.ByteString.Lazy as BSL
+import Data.HashMap.Lazy ( HashMap )
+import qualified Data.HashMap.Lazy as HashMap
+import qualified Data.Text.Encoding as Encoding
+import Data.Text.Lazy ( Text
+ , fromStrict
+ )
+import qualified Data.Text.Lazy as Text
+import qualified Network.HTTP.Types.Status as Http
+import qualified Network.Socket as Socket
+import qualified Network.Wai as Wai
+import qualified Network.Wai.Handler.Warp as Warp
+import Network.Wai.Middleware.RequestLogger
+ ( logStdoutDev )
+import qualified System.Console.GetOpt as Opt
+import qualified System.Environment as Environment
+import qualified Web.Scotty.Trans as Scotty
+
+main :: IO ()
+main = Exception.bracket startup shutdown run
+ where
+ run (p, waiapp) =
+ putText ("que-server starting on port " <> show p) >> Warp.run p waiapp
+ startup = do
+ opts <- Environment.getArgs /> getOpts
+ sync <- STM.newTVarIO opts
+ let runActionToIO m = runReaderT (runApp m) sync
+ waiapp <- Scotty.scottyAppT runActionToIO routes
+ return (port opts, waiapp)
+ shutdown :: a -> IO a
+ shutdown = pure . identity
+ getOpts args = case Opt.getOpt Opt.Permute options args of
+ ([] , [], _) -> Exception.throw ErrorParsingOptions
+ (opts, _ , _) -> smoosh initialAppState opts
+ options =
+ [ Opt.Option
+ ['p']
+ ["port"]
+ (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT")
+ "port to run on "
+ ]
+
+data Error = ErrorParsingOptions
+ deriving (Show)
+
+instance Exception.Exception Error
+
+-- | Only allow my IP or local to access some route.
+guardIP :: Wai.Request -> Scotty.ActionT Text App ()
+guardIP r = case Wai.remoteHost r of
+ Socket.SockAddrInet _ ip | ip `elem` allowed -> Scotty.status Http.ok200
+ _ -> Scotty.status Http.methodNotAllowed405
+ where
+ allowed = Socket.tupleToHostAddress </ [(72, 222, 221, 62), (127, 0, 0, 1)]
+
+routes :: Scotty.ScottyT Text App ()
+routes = do
+ Scotty.middleware logStdoutDev
+
+ let quepath = "^/([[:alnum:]_]*)/([[:alnum:]._/]*)$"
+ let index = "^(/|/index.html)$"
+
+ Scotty.get (Scotty.regex index) <| do
+ let (ns, qp) = ("_", ["index"])
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ r <- liftIO <| takeQue q
+ Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
+
+ Scotty.post (Scotty.regex index) <| do
+ r <- Scotty.request
+ guardIP r
+ let (ns, qp) = ("_", ["index"])
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ qdata <- Scotty.body
+ liftIO <| pushQue (BSL.toStrict qdata) q
+ return ()
+
+ Scotty.matchAny (Scotty.regex "^/([[:alnum:]_]*)/?$") <| do
+ -- matches '/ns' and '/ns/' but not '/ns/path'
+ Scotty.status Http.notImplemented501
+ Scotty.text "namespace management coming soon"
+
+ -- | Receive a value from a que. Blocks until a value is received,
+ -- then returns. If 'poll=true', then stream data from the Que to the
+ -- client.
+ Scotty.get (Scotty.regex quepath) <| do
+ (ns, qp) <- extract
+ -- ensure namespace exists
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ poll <- Scotty.param "poll" !: (pure . const False)
+ case poll of
+ True -> Scotty.stream $ streamQue q
+ _ -> do
+ r <- liftIO <| takeQue q
+ Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
+
+ -- | Put a value on a que. Returns immediately.
+ Scotty.post (Scotty.regex quepath) <| do
+ r <- Scotty.request
+ when (BS.isPrefixOf "/_" <| Wai.rawPathInfo r) $ guardIP r
+ (ns, qp) <- extract
+ -- ensure namespace exists
+ app . modify <| upsertNamespace ns
+ q <- app <| que ns qp
+ qdata <- Scotty.body
+ liftIO <| pushQue (BSL.toStrict qdata) q
+ return ()
+
+-- | recover from a scotty-thrown exception.
+(!:)
+ :: Scotty.ActionT Text App a -- ^ action that might throw
+ -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead
+ -> Scotty.ActionT Text App a
+(!:) = Scotty.rescue
+
+-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
+streamQue :: Que -> Wai.StreamingBody
+streamQue q write _ = Go.mult q >>= loop
+ where
+ loop c =
+ Go.tap c
+ >>= (write . Builder.byteStringInsert)
+ >> (write <| Builder.byteStringInsert "\n")
+ >> loop c
+
+-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
+grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
+grab = flip (HashMap.!)
+
+-- | Inserts the namespace in 'AppState' if it doesn't exist.
+upsertNamespace :: Namespace -> AppState -> AppState
+upsertNamespace ns as = if HashMap.member ns (ques as)
+ then as
+ else as { ques = HashMap.insert ns mempty (ques as) }
+
+-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
+insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
+insertQue ns qp q as = as { ques = newQues }
+ where
+ newQues = HashMap.insert ns newQbase (ques as)
+ newQbase = HashMap.insert qp q <| grab ns <| ques as
+
+extract :: Scotty.ActionT Text App (Namespace, Quepath)
+extract = do
+ ns <- Scotty.param "0"
+ path <- Scotty.param "1"
+ let p = Text.split (== '/') path |> filter (not . Text.null)
+ return (ns, p)
+
+newtype App a = App
+ { runApp :: ReaderT (STM.TVar AppState) IO a
+ }
+ deriving (Applicative, Functor, Monad, MonadIO, MonadReader
+ (STM.TVar AppState))
+
+data AppState = AppState
+ { ques :: HashMap Namespace Quebase
+ , port :: Warp.Port
+ }
+
+initialAppState :: AppState
+initialAppState = AppState { port = 80, ques = mempty }
+
+-- | Resolve a list of 'AppState' transitions into one.
+smoosh
+ :: AppState -- ^ Initial app state to start with
+ -> [AppState -> AppState] -- ^ List of functions to apply in order
+ -> AppState
+smoosh = foldr identity
+-- there's gotta be a standard name for this
+
+-- | A synonym for 'lift' in order to be explicit about when we are
+-- operating at the 'App' layer.
+app :: MonadTrans t => App a -> t App a
+app = lift
+
+-- | Get something from the app state
+gets :: (AppState -> b) -> App b
+gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
+
+-- | Apply a function to the app state
+modify :: (AppState -> AppState) -> App ()
+modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
+
+type Namespace = Text -- ^ housing for a set of que paths
+type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
+type Quepath = [Text] -- ^ any path can serve as an identifier for a que
+type Quedata = ByteString -- ^ any opaque data
+type Quebase = HashMap Quepath Que -- ^ a collection of ques
+
+-- | Lookup or create a que
+que :: Namespace -> Quepath -> App Que
+que ns qp = do
+ _ques <- gets ques
+ let qbase = grab ns _ques
+ queExists = HashMap.member qp qbase
+ if queExists
+ then return <| grab qp qbase
+ else do
+ c <- liftIO Go.chan
+ modify (insertQue ns qp c)
+ gets ques /> grab ns /> grab qp
+
+-- | Put data on the que.
+pushQue :: Quedata -> Que -> IO ()
+pushQue = flip Go.write
+
+-- | Tap and read from the Que. Tap first because a Que is actually a
+-- broadcast channel. This allows for multiconsumer Ques.
+takeQue :: Que -> IO Quedata
+takeQue ch = Go.mult ch >>= Go.tap
diff --git a/Run/Que/Website.hs b/Run/Que/Website.hs
new file mode 100644
index 0000000..1de6bca
--- /dev/null
+++ b/Run/Que/Website.hs
@@ -0,0 +1,45 @@
+-- | spawns a few processes that serve the que.run website
+module Run.Que.Website where
+
+import Prelude
+import System.Environment as Environment
+import System.FilePath ( (</>) )
+import qualified System.Process as Process
+
+main :: IO ()
+main = do
+ args <- Environment.getArgs
+ let [src, ns] = if length args == 2
+ then take 2 args
+ else if length args == 1
+ then args ++ ["/"]
+ else error "usage: que-website <srcdir> [namespace]"
+ homepage <- getHomepageHtml (src </> "style.css") (src </> "index.md")
+ client <- readFile $ src </> "client.py"
+ putStrLn $ "serving " ++ src ++ " at " ++ ns
+ loop ns homepage client
+
+loop :: String -> FilePath -> FilePath -> IO ()
+loop ns homepage client =
+ serve (ns </> "index.html") homepage
+ >> serve (ns </> "_client/python") client
+ >> loop ns homepage client
+
+getHomepageHtml :: String -> String -> IO String
+getHomepageHtml style index = Process.readProcess
+ "pandoc"
+ [ "--self-contained"
+ , "--css"
+ , style
+ , "-i"
+ , index
+ , "--from"
+ , "markdown"
+ , "--to"
+ , "html"
+ ]
+ []
+
+serve :: FilePath -> FilePath -> IO ()
+serve path file =
+ Process.callProcess "curl" ["https://que.run" ++ path, "-d", file]
diff --git a/Run/Que/client.py b/Run/Que/client.py
new file mode 100755
index 0000000..8058a05
--- /dev/null
+++ b/Run/Que/client.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python3
+"""
+simple client for que.run
+"""
+
+import argparse
+import urllib.request as request
+import urllib.parse
+import time
+import subprocess
+import sys
+
+# set to something ridiculously high so we don't run into timeouts while polling
+# or waiting for a message
+MAX_TIMEOUT = 100000000
+
+
+def main(argv=None):
+ cli = argparse.ArgumentParser(description=__doc__)
+ cli.add_argument(
+ "--host", default="https://que.run", help="where que-server is running"
+ )
+ cli.add_argument(
+ "--poll", default=False, action="store_true", help="stream data from the que"
+ )
+ cli.add_argument(
+ "--then",
+ help="when polling, run this shell command after each response, replacing '\que' with the target and '\msg' with the body of the response",
+ )
+ cli.add_argument(
+ "--serve",
+ default=False,
+ action="store_true",
+ help="when posting to the que, do so continuously in a loop. this can be used for serving a webpage or other file continuously",
+ )
+ cli.add_argument(
+ "target", help="namespace and path of the que, like 'ns/path/subpath'"
+ )
+ cli.add_argument(
+ "infile",
+ nargs="?",
+ type=argparse.FileType("r"),
+ help="data to put on the que. Use '-' for stdin, otherwise should be a readable file",
+ )
+
+ if argv is None:
+ args = cli.parse_args()
+ else:
+ args = cli.parse_args(argv)
+
+ if args.infile:
+ # send input data
+ data = args.infile.read().encode("utf-8").strip()
+ if args.serve:
+ # loop until ^C
+ while not time.sleep(1):
+ with request.urlopen(
+ f"{args.host}/{args.target}", data=data, timeout=MAX_TIMEOUT
+ ) as req:
+ pass
+ else:
+ with request.urlopen(
+ f"{args.host}/{args.target}", data=data, timeout=MAX_TIMEOUT
+ ) as req:
+ pass
+ else:
+ # no input data, do a read instead
+ params = urllib.parse.urlencode({"poll": args.poll})
+ url = f"{args.host}/{args.target}?{params}"
+ with request.urlopen(url) as req:
+ if args.poll:
+ while not time.sleep(1):
+ msg = req.readline().decode("utf-8").strip()
+ print(msg)
+ if args.then:
+ subprocess.run(
+ args.then.replace("\msg", msg).replace("\que", args.target),
+ shell=True,
+ )
+ else:
+ msg = req.read().decode("utf-8").strip()
+ print(msg)
+ if args.then:
+ subprocess.run(
+ args.then.replace("\msg", msg).replace("\que", args.target),
+ shell=True,
+ )
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ print("Interrupted")
+ sys.exit(0)
diff --git a/Run/Que/index.md b/Run/Que/index.md
new file mode 100644
index 0000000..b619de7
--- /dev/null
+++ b/Run/Que/index.md
@@ -0,0 +1,68 @@
+% que.run
+
+que is the concurrent, async runtime in the cloud
+
+ - runtime concurrency anywhere you have a network connection
+ - multilanguage communicating sequential processes
+ - add Go-like channels to any language
+ - connect your microservices together with the simplest possible
+ plumbing
+ - async programming as easy as running two terminal commands
+
+HTTP routes on que.run are Golang-like channels with a namespace and a
+path. For example: `https://que.run/example/path/subpath`.
+
+## download the client
+
+There is a simple script `que` that acts as a client you can use to
+interact with the `que.run` service.
+
+Download it to somewhere on your `$PATH` and make it executable:
+
+ curl https://que.run/_client/python > ~/bin/que
+ chmod +x ~/bin/que
+ que --help
+
+The client requires a recent version of Python 3.
+
+## examples
+
+Here are some example applications, I will update these in the coming
+weeks with additional useful scripts.
+
+### desktop notifications
+
+Lets say we are running a job that takes a long time, maybe we are
+compiling or running a large test suite. Instead of watching the
+terminal until it completes, or flipping back to check on it every so
+often, we can create a listener that displays a popup notification when
+the job finishes.
+
+In one terminal run the listener:
+
+ que example/notify --then "notify-send '\que' '\msg'"
+
+In some other terminal run the job that takes forever:
+
+ runtests ; echo "tests are done" | que example/notify -
+
+When terminal 2 succeeds, terminal 1 will print "tests are done", then
+call the `notify-send` command, which displays a notification toast in
+Linux with title "`example/notify`" and content "`tests are done`".
+
+Que paths are multi-producer and multi-consumer, so you can add as many
+terminals as you want.
+
+On macOS you could use:
+
+ osascript -e 'display notification "\msg" with title "\que"'
+
+in place of notify-send.
+
+### ephemeral, serverless chat rooms
+
+coming soon...
+
+### collaborative jukebox
+
+coming soon...
diff --git a/Run/Que/style.css b/Run/Que/style.css
new file mode 100644
index 0000000..fa73fa4
--- /dev/null
+++ b/Run/Que/style.css
@@ -0,0 +1,4 @@
+/* perfect motherfucking css framework */
+body{max-width:650px;margin:40px auto;padding:0 10px;font:18px/1.5 -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";color:#444}h1,h2,h3{line-height:1.2}@media (prefers-color-scheme: dark){body{color:white;background:#444}a:link{color:#5bf}a:visited{color:#ccf}}
+
+/* my stuff */