From 9f3804d5e4f28ea61a8abc856210422ad794b55e Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Tue, 31 Mar 2020 11:39:49 -0700 Subject: Add Run.Que.Website server This is a simple website server that uses que.run itself to host the que webpages. I had to rename Run.Que to Run.Que.Server because nix was complaining about Run.Que being both a derivation and an attrset with Run.Que.Website in it. --- Run/Que.hs | 238 ----------------------------------------------------- Run/Que/Server.hs | 238 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Run/Que/Website.hs | 45 ++++++++++ Run/Que/client.py | 95 +++++++++++++++++++++ Run/Que/index.md | 68 +++++++++++++++ Run/Que/style.css | 4 + default.nix | 14 ++-- 7 files changed, 458 insertions(+), 244 deletions(-) delete mode 100644 Run/Que.hs create mode 100644 Run/Que/Server.hs create mode 100644 Run/Que/Website.hs create mode 100755 Run/Que/client.py create mode 100644 Run/Que/index.md create mode 100644 Run/Que/style.css diff --git a/Run/Que.hs b/Run/Que.hs deleted file mode 100644 index cf9467b..0000000 --- a/Run/Que.hs +++ /dev/null @@ -1,238 +0,0 @@ -{-# LANGUAGE NoImplicitPrelude #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE LambdaCase #-} - -{- | Interprocess communication --} -module Run.Que - ( main - ) -where - -import Com.Simatime.Alpha hiding ( Text - , get - , gets - , modify - , poll - ) -import qualified Com.Simatime.Go as Go -import qualified Control.Concurrent.STM as STM -import qualified Control.Exception as Exception -import Control.Monad.Reader ( MonadTrans ) -import qualified Data.ByteString as BS -import qualified Data.ByteString.Builder.Extra as Builder -import qualified Data.ByteString.Lazy as BSL -import Data.HashMap.Lazy ( HashMap ) -import qualified Data.HashMap.Lazy as HashMap -import qualified Data.Text.Encoding as Encoding -import Data.Text.Lazy ( Text - , fromStrict - ) -import qualified Data.Text.Lazy as Text -import qualified Network.HTTP.Types.Status as Http -import qualified Network.Socket as Socket -import qualified Network.Wai as Wai -import qualified Network.Wai.Handler.Warp as Warp -import Network.Wai.Middleware.RequestLogger - ( logStdoutDev ) -import qualified System.Console.GetOpt as Opt -import qualified System.Environment as Environment -import qualified Web.Scotty.Trans as Scotty - -main :: IO () -main = Exception.bracket startup shutdown run - where - run (p, waiapp) = - putText ("que-server starting on port " <> show p) >> Warp.run p waiapp - startup = do - opts <- Environment.getArgs /> getOpts - sync <- STM.newTVarIO opts - let runActionToIO m = runReaderT (runApp m) sync - waiapp <- Scotty.scottyAppT runActionToIO routes - return (port opts, waiapp) - shutdown :: a -> IO a - shutdown = pure . identity - getOpts args = case Opt.getOpt Opt.Permute options args of - ([] , [], _) -> Exception.throw ErrorParsingOptions - (opts, _ , _) -> smoosh initialAppState opts - options = - [ Opt.Option - ['p'] - ["port"] - (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT") - "port to run on " - ] - -data Error = ErrorParsingOptions - deriving (Show) - -instance Exception.Exception Error - --- | Only allow my IP or local to access some route. -guardIP :: Wai.Request -> Scotty.ActionT Text App () -guardIP r = case Wai.remoteHost r of - Socket.SockAddrInet _ ip | ip `elem` allowed -> Scotty.status Http.ok200 - _ -> Scotty.status Http.methodNotAllowed405 - where - allowed = Socket.tupleToHostAddress Scotty.stream $ streamQue q - _ -> do - r <- liftIO <| takeQue q - Scotty.html <| fromStrict <| Encoding.decodeUtf8 r - - -- | Put a value on a que. Returns immediately. - Scotty.post (Scotty.regex quepath) <| do - r <- Scotty.request - when (BS.isPrefixOf "/_" <| Wai.rawPathInfo r) $ guardIP r - (ns, qp) <- extract - -- ensure namespace exists - app . modify <| upsertNamespace ns - q <- app <| que ns qp - qdata <- Scotty.body - liftIO <| pushQue (BSL.toStrict qdata) q - return () - --- | recover from a scotty-thrown exception. -(!:) - :: Scotty.ActionT Text App a -- ^ action that might throw - -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead - -> Scotty.ActionT Text App a -(!:) = Scotty.rescue - --- | Forever write the data from 'Que' to 'Wai.StreamingBody'. -streamQue :: Que -> Wai.StreamingBody -streamQue q write _ = Go.mult q >>= loop - where - loop c = - Go.tap c - >>= (write . Builder.byteStringInsert) - >> (write <| Builder.byteStringInsert "\n") - >> loop c - --- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. -grab :: (Eq k, Hashable k) => k -> HashMap k v -> v -grab = flip (HashMap.!) - --- | Inserts the namespace in 'AppState' if it doesn't exist. -upsertNamespace :: Namespace -> AppState -> AppState -upsertNamespace ns as = if HashMap.member ns (ques as) - then as - else as { ques = HashMap.insert ns mempty (ques as) } - --- | Inserts the que at the proper 'Namespace' and 'Quepath'. -insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState -insertQue ns qp q as = as { ques = newQues } - where - newQues = HashMap.insert ns newQbase (ques as) - newQbase = HashMap.insert qp q <| grab ns <| ques as - -extract :: Scotty.ActionT Text App (Namespace, Quepath) -extract = do - ns <- Scotty.param "0" - path <- Scotty.param "1" - let p = Text.split (== '/') path |> filter (not . Text.null) - return (ns, p) - -newtype App a = App - { runApp :: ReaderT (STM.TVar AppState) IO a - } - deriving (Applicative, Functor, Monad, MonadIO, MonadReader - (STM.TVar AppState)) - -data AppState = AppState - { ques :: HashMap Namespace Quebase - , port :: Warp.Port - } - -initialAppState :: AppState -initialAppState = AppState { port = 80, ques = mempty } - --- | Resolve a list of 'AppState' transitions into one. -smoosh - :: AppState -- ^ Initial app state to start with - -> [AppState -> AppState] -- ^ List of functions to apply in order - -> AppState -smoosh = foldr identity --- there's gotta be a standard name for this - --- | A synonym for 'lift' in order to be explicit about when we are --- operating at the 'App' layer. -app :: MonadTrans t => App a -> t App a -app = lift - --- | Get something from the app state -gets :: (AppState -> b) -> App b -gets f = ask >>= liftIO . STM.readTVarIO >>= return . f - --- | Apply a function to the app state -modify :: (AppState -> AppState) -> App () -modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f - -type Namespace = Text -- ^ housing for a set of que paths -type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes -type Quepath = [Text] -- ^ any path can serve as an identifier for a que -type Quedata = ByteString -- ^ any opaque data -type Quebase = HashMap Quepath Que -- ^ a collection of ques - --- | Lookup or create a que -que :: Namespace -> Quepath -> App Que -que ns qp = do - _ques <- gets ques - let qbase = grab ns _ques - queExists = HashMap.member qp qbase - if queExists - then return <| grab qp qbase - else do - c <- liftIO Go.chan - modify (insertQue ns qp c) - gets ques /> grab ns /> grab qp - --- | Put data on the que. -pushQue :: Quedata -> Que -> IO () -pushQue = flip Go.write - --- | Tap and read from the Que. Tap first because a Que is actually a --- broadcast channel. This allows for multiconsumer Ques. -takeQue :: Que -> IO Quedata -takeQue ch = Go.mult ch >>= Go.tap diff --git a/Run/Que/Server.hs b/Run/Que/Server.hs new file mode 100644 index 0000000..1acbe60 --- /dev/null +++ b/Run/Que/Server.hs @@ -0,0 +1,238 @@ +{-# LANGUAGE NoImplicitPrelude #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} + +{- | Interprocess communication +-} +module Run.Que.Server + ( main + ) +where + +import Com.Simatime.Alpha hiding ( Text + , get + , gets + , modify + , poll + ) +import qualified Com.Simatime.Go as Go +import qualified Control.Concurrent.STM as STM +import qualified Control.Exception as Exception +import Control.Monad.Reader ( MonadTrans ) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Builder.Extra as Builder +import qualified Data.ByteString.Lazy as BSL +import Data.HashMap.Lazy ( HashMap ) +import qualified Data.HashMap.Lazy as HashMap +import qualified Data.Text.Encoding as Encoding +import Data.Text.Lazy ( Text + , fromStrict + ) +import qualified Data.Text.Lazy as Text +import qualified Network.HTTP.Types.Status as Http +import qualified Network.Socket as Socket +import qualified Network.Wai as Wai +import qualified Network.Wai.Handler.Warp as Warp +import Network.Wai.Middleware.RequestLogger + ( logStdoutDev ) +import qualified System.Console.GetOpt as Opt +import qualified System.Environment as Environment +import qualified Web.Scotty.Trans as Scotty + +main :: IO () +main = Exception.bracket startup shutdown run + where + run (p, waiapp) = + putText ("que-server starting on port " <> show p) >> Warp.run p waiapp + startup = do + opts <- Environment.getArgs /> getOpts + sync <- STM.newTVarIO opts + let runActionToIO m = runReaderT (runApp m) sync + waiapp <- Scotty.scottyAppT runActionToIO routes + return (port opts, waiapp) + shutdown :: a -> IO a + shutdown = pure . identity + getOpts args = case Opt.getOpt Opt.Permute options args of + ([] , [], _) -> Exception.throw ErrorParsingOptions + (opts, _ , _) -> smoosh initialAppState opts + options = + [ Opt.Option + ['p'] + ["port"] + (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT") + "port to run on " + ] + +data Error = ErrorParsingOptions + deriving (Show) + +instance Exception.Exception Error + +-- | Only allow my IP or local to access some route. +guardIP :: Wai.Request -> Scotty.ActionT Text App () +guardIP r = case Wai.remoteHost r of + Socket.SockAddrInet _ ip | ip `elem` allowed -> Scotty.status Http.ok200 + _ -> Scotty.status Http.methodNotAllowed405 + where + allowed = Socket.tupleToHostAddress Scotty.stream $ streamQue q + _ -> do + r <- liftIO <| takeQue q + Scotty.html <| fromStrict <| Encoding.decodeUtf8 r + + -- | Put a value on a que. Returns immediately. + Scotty.post (Scotty.regex quepath) <| do + r <- Scotty.request + when (BS.isPrefixOf "/_" <| Wai.rawPathInfo r) $ guardIP r + (ns, qp) <- extract + -- ensure namespace exists + app . modify <| upsertNamespace ns + q <- app <| que ns qp + qdata <- Scotty.body + liftIO <| pushQue (BSL.toStrict qdata) q + return () + +-- | recover from a scotty-thrown exception. +(!:) + :: Scotty.ActionT Text App a -- ^ action that might throw + -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead + -> Scotty.ActionT Text App a +(!:) = Scotty.rescue + +-- | Forever write the data from 'Que' to 'Wai.StreamingBody'. +streamQue :: Que -> Wai.StreamingBody +streamQue q write _ = Go.mult q >>= loop + where + loop c = + Go.tap c + >>= (write . Builder.byteStringInsert) + >> (write <| Builder.byteStringInsert "\n") + >> loop c + +-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. +grab :: (Eq k, Hashable k) => k -> HashMap k v -> v +grab = flip (HashMap.!) + +-- | Inserts the namespace in 'AppState' if it doesn't exist. +upsertNamespace :: Namespace -> AppState -> AppState +upsertNamespace ns as = if HashMap.member ns (ques as) + then as + else as { ques = HashMap.insert ns mempty (ques as) } + +-- | Inserts the que at the proper 'Namespace' and 'Quepath'. +insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState +insertQue ns qp q as = as { ques = newQues } + where + newQues = HashMap.insert ns newQbase (ques as) + newQbase = HashMap.insert qp q <| grab ns <| ques as + +extract :: Scotty.ActionT Text App (Namespace, Quepath) +extract = do + ns <- Scotty.param "0" + path <- Scotty.param "1" + let p = Text.split (== '/') path |> filter (not . Text.null) + return (ns, p) + +newtype App a = App + { runApp :: ReaderT (STM.TVar AppState) IO a + } + deriving (Applicative, Functor, Monad, MonadIO, MonadReader + (STM.TVar AppState)) + +data AppState = AppState + { ques :: HashMap Namespace Quebase + , port :: Warp.Port + } + +initialAppState :: AppState +initialAppState = AppState { port = 80, ques = mempty } + +-- | Resolve a list of 'AppState' transitions into one. +smoosh + :: AppState -- ^ Initial app state to start with + -> [AppState -> AppState] -- ^ List of functions to apply in order + -> AppState +smoosh = foldr identity +-- there's gotta be a standard name for this + +-- | A synonym for 'lift' in order to be explicit about when we are +-- operating at the 'App' layer. +app :: MonadTrans t => App a -> t App a +app = lift + +-- | Get something from the app state +gets :: (AppState -> b) -> App b +gets f = ask >>= liftIO . STM.readTVarIO >>= return . f + +-- | Apply a function to the app state +modify :: (AppState -> AppState) -> App () +modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f + +type Namespace = Text -- ^ housing for a set of que paths +type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes +type Quepath = [Text] -- ^ any path can serve as an identifier for a que +type Quedata = ByteString -- ^ any opaque data +type Quebase = HashMap Quepath Que -- ^ a collection of ques + +-- | Lookup or create a que +que :: Namespace -> Quepath -> App Que +que ns qp = do + _ques <- gets ques + let qbase = grab ns _ques + queExists = HashMap.member qp qbase + if queExists + then return <| grab qp qbase + else do + c <- liftIO Go.chan + modify (insertQue ns qp c) + gets ques /> grab ns /> grab qp + +-- | Put data on the que. +pushQue :: Quedata -> Que -> IO () +pushQue = flip Go.write + +-- | Tap and read from the Que. Tap first because a Que is actually a +-- broadcast channel. This allows for multiconsumer Ques. +takeQue :: Que -> IO Quedata +takeQue ch = Go.mult ch >>= Go.tap diff --git a/Run/Que/Website.hs b/Run/Que/Website.hs new file mode 100644 index 0000000..1de6bca --- /dev/null +++ b/Run/Que/Website.hs @@ -0,0 +1,45 @@ +-- | spawns a few processes that serve the que.run website +module Run.Que.Website where + +import Prelude +import System.Environment as Environment +import System.FilePath ( () ) +import qualified System.Process as Process + +main :: IO () +main = do + args <- Environment.getArgs + let [src, ns] = if length args == 2 + then take 2 args + else if length args == 1 + then args ++ ["/"] + else error "usage: que-website [namespace]" + homepage <- getHomepageHtml (src "style.css") (src "index.md") + client <- readFile $ src "client.py" + putStrLn $ "serving " ++ src ++ " at " ++ ns + loop ns homepage client + +loop :: String -> FilePath -> FilePath -> IO () +loop ns homepage client = + serve (ns "index.html") homepage + >> serve (ns "_client/python") client + >> loop ns homepage client + +getHomepageHtml :: String -> String -> IO String +getHomepageHtml style index = Process.readProcess + "pandoc" + [ "--self-contained" + , "--css" + , style + , "-i" + , index + , "--from" + , "markdown" + , "--to" + , "html" + ] + [] + +serve :: FilePath -> FilePath -> IO () +serve path file = + Process.callProcess "curl" ["https://que.run" ++ path, "-d", file] diff --git a/Run/Que/client.py b/Run/Que/client.py new file mode 100755 index 0000000..8058a05 --- /dev/null +++ b/Run/Que/client.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +""" +simple client for que.run +""" + +import argparse +import urllib.request as request +import urllib.parse +import time +import subprocess +import sys + +# set to something ridiculously high so we don't run into timeouts while polling +# or waiting for a message +MAX_TIMEOUT = 100000000 + + +def main(argv=None): + cli = argparse.ArgumentParser(description=__doc__) + cli.add_argument( + "--host", default="https://que.run", help="where que-server is running" + ) + cli.add_argument( + "--poll", default=False, action="store_true", help="stream data from the que" + ) + cli.add_argument( + "--then", + help="when polling, run this shell command after each response, replacing '\que' with the target and '\msg' with the body of the response", + ) + cli.add_argument( + "--serve", + default=False, + action="store_true", + help="when posting to the que, do so continuously in a loop. this can be used for serving a webpage or other file continuously", + ) + cli.add_argument( + "target", help="namespace and path of the que, like 'ns/path/subpath'" + ) + cli.add_argument( + "infile", + nargs="?", + type=argparse.FileType("r"), + help="data to put on the que. Use '-' for stdin, otherwise should be a readable file", + ) + + if argv is None: + args = cli.parse_args() + else: + args = cli.parse_args(argv) + + if args.infile: + # send input data + data = args.infile.read().encode("utf-8").strip() + if args.serve: + # loop until ^C + while not time.sleep(1): + with request.urlopen( + f"{args.host}/{args.target}", data=data, timeout=MAX_TIMEOUT + ) as req: + pass + else: + with request.urlopen( + f"{args.host}/{args.target}", data=data, timeout=MAX_TIMEOUT + ) as req: + pass + else: + # no input data, do a read instead + params = urllib.parse.urlencode({"poll": args.poll}) + url = f"{args.host}/{args.target}?{params}" + with request.urlopen(url) as req: + if args.poll: + while not time.sleep(1): + msg = req.readline().decode("utf-8").strip() + print(msg) + if args.then: + subprocess.run( + args.then.replace("\msg", msg).replace("\que", args.target), + shell=True, + ) + else: + msg = req.read().decode("utf-8").strip() + print(msg) + if args.then: + subprocess.run( + args.then.replace("\msg", msg).replace("\que", args.target), + shell=True, + ) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Interrupted") + sys.exit(0) diff --git a/Run/Que/index.md b/Run/Que/index.md new file mode 100644 index 0000000..b619de7 --- /dev/null +++ b/Run/Que/index.md @@ -0,0 +1,68 @@ +% que.run + +que is the concurrent, async runtime in the cloud + + - runtime concurrency anywhere you have a network connection + - multilanguage communicating sequential processes + - add Go-like channels to any language + - connect your microservices together with the simplest possible + plumbing + - async programming as easy as running two terminal commands + +HTTP routes on que.run are Golang-like channels with a namespace and a +path. For example: `https://que.run/example/path/subpath`. + +## download the client + +There is a simple script `que` that acts as a client you can use to +interact with the `que.run` service. + +Download it to somewhere on your `$PATH` and make it executable: + + curl https://que.run/_client/python > ~/bin/que + chmod +x ~/bin/que + que --help + +The client requires a recent version of Python 3. + +## examples + +Here are some example applications, I will update these in the coming +weeks with additional useful scripts. + +### desktop notifications + +Lets say we are running a job that takes a long time, maybe we are +compiling or running a large test suite. Instead of watching the +terminal until it completes, or flipping back to check on it every so +often, we can create a listener that displays a popup notification when +the job finishes. + +In one terminal run the listener: + + que example/notify --then "notify-send '\que' '\msg'" + +In some other terminal run the job that takes forever: + + runtests ; echo "tests are done" | que example/notify - + +When terminal 2 succeeds, terminal 1 will print "tests are done", then +call the `notify-send` command, which displays a notification toast in +Linux with title "`example/notify`" and content "`tests are done`". + +Que paths are multi-producer and multi-consumer, so you can add as many +terminals as you want. + +On macOS you could use: + + osascript -e 'display notification "\msg" with title "\que"' + +in place of notify-send. + +### ephemeral, serverless chat rooms + +coming soon... + +### collaborative jukebox + +coming soon... diff --git a/Run/Que/style.css b/Run/Que/style.css new file mode 100644 index 0000000..fa73fa4 --- /dev/null +++ b/Run/Que/style.css @@ -0,0 +1,4 @@ +/* perfect motherfucking css framework */ +body{max-width:650px;margin:40px auto;padding:0 10px;font:18px/1.5 -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";color:#444}h1,h2,h3{line-height:1.2}@media (prefers-color-scheme: dark){body{color:white;background:#444}a:link{color:#5bf}a:visited{color:#ccf}} + +/* my stuff */ diff --git a/default.nix b/default.nix index 54db612..13ded48 100644 --- a/default.nix +++ b/default.nix @@ -49,7 +49,7 @@ in rec { }; }; Com.Simatime.Serval = buildOS { - deps = { que-server = Run.Que; }; + deps = { que-server = Run.Que.Server; }; configuration = { imports = [ ./Com/Simatime/packages.nix @@ -161,19 +161,21 @@ in rec { ]; }; }; - Run.Que = buildGhc { - name = "Run.Que"; + Run.Que.Server = buildGhc { + name = "Run.Que.Server"; nick = "que-server"; deps = [ - "aeson" "async" "protolude" "scotty" - "servant" - "servant-server" "stm" "unagi-chan" "unordered-containers" ]; }; + Run.Que.Website = buildGhc { + name = "Run.Que.Website"; + nick = "que-website"; + deps = [ "process" ]; + }; } -- cgit v1.2.3