{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE NoImplicitPrelude #-} {-# OPTIONS_GHC -fno-warn-orphans #-} -- | Interprocess communication -- -- Prior art: -- - -- - -- - -- - sorta: and -- -- : out que-server module Biz.Que.Host ( main, ) where import Alpha hiding (gets, modify, poll) import qualified Biz.Cli as Cli import qualified Biz.Log as Log import Biz.Test ((@=?)) import qualified Biz.Test as Test import qualified Control.Concurrent.Go as Go import qualified Control.Concurrent.STM as STM import qualified Control.Exception as Exception import Data.HashMap.Lazy (HashMap) import qualified Data.HashMap.Lazy as HashMap import Network.HTTP.Media ((//), (/:)) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.API.Generic ((:-)) -- import qualified Servant.Auth.Server as Auth import Servant.Server.Generic (AsServerT, genericServeT) import qualified Servant.Types.SourceT as Source import qualified System.Envy as Envy main :: IO () main = Cli.main <| Cli.Plan help move test pure move :: Cli.Arguments -> IO () move _ = Exception.bracket startup shutdown <| uncurry Warp.run where startup = Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do initialState <- atomically <| STM.newTVar mempty -- natural transformation let nt :: AppState -> App a -> Servant.Handler a nt s x = runReaderT x s let app :: AppState -> Application app s = genericServeT (nt s) (paths cfg) Log.info ["boot", "que"] >> Log.br Log.info ["boot", "port", show <| quePort] >> Log.br Log.info ["boot", "skey", queSkey] >> Log.br pure (quePort, app initialState) shutdown :: a -> IO a shutdown = pure <. identity help :: Cli.Docopt help = [Cli.docopt| que-server Usage: que-server que-server test |] test :: Test.Tree test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)] type App = ReaderT AppState Servant.Handler type Ques = HashMap Namespace Quebase type AppState = STM.TVar Ques data Config = Config { -- | QUE_PORT quePort :: Warp.Port, -- | QUE_SKEY queSkey :: Text } deriving (Generic, Show) instance Envy.DefConfig Config where defConfig = Config 3001 "admin-key" instance Envy.FromEnv Config -- | A simple HTML type. This recognizes "content-type: text/html" but doesn't -- do any conversion, rendering, or sanitization like the -- 'Servant.HTML.Lucid.HTML' type would do. data HTML deriving (Typeable) instance Accept HTML where contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"] instance MimeRender HTML ByteString where mimeRender _ = str instance MimeUnrender HTML Text where mimeUnrender _ bs = Right <| str bs instance MimeUnrender OctetStream Text where mimeUnrender _ bs = Right <| str bs instance MimeRender PlainText ByteString where mimeRender _ = str instance MimeUnrender PlainText ByteString where mimeUnrender _ bs = Right <| str bs data Paths path = Paths { home :: path :- Get '[JSON] NoContent, dash :: path :- "_" :> "dash" :> Get '[JSON] Ques, getQue :: path :- Capture "ns" Text :> Capture "quename" Text :> Get '[PlainText, HTML, OctetStream] Message, getStream :: path :- Capture "ns" Text :> Capture "quename" Text :> "stream" :> StreamGet NoFraming OctetStream (SourceIO Message), putQue :: path :- Capture "ns" Text :> Capture "quepath" Text :> ReqBody '[PlainText, HTML, OctetStream] Text :> Post '[PlainText, HTML, OctetStream] NoContent } deriving (Generic) paths :: Config -> Paths (AsServerT App) paths _ = -- TODO revive authkey stuff -- - read Authorization header, compare with queSkey -- - Only allow my IP or localhost to publish to '_' namespace Paths { home = throwError <| err301 {errHeaders = [("Location", "/_/index")]}, dash = gets, getQue = \ns qn -> do guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qn Go.mult q |> liftIO +> Go.tap |> liftIO, getStream = \ns qn -> do guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qn Go.mult q |> liftIO +> Go.tap |> Source.fromAction (const False) -- peek chan instead of False? |> pure, putQue = \ns qp body -> do guardNs ns ["pub", "_"] modify <| upsertNamespace ns q <- que ns qp body |> str |> Go.write q >> Go.read q -- flush the que, otherwise msgs never clear |> liftIO -- TODO: detect number of readers, respond with "sent to N readers" or -- "no readers, msg lost" >> pure NoContent } -- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist` -- list, return a 405 error. guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a () guardNs ns whitelist = when (not <| ns `elem` whitelist) <| do throwError <| err405 {errBody = str msg} where msg = "not allowed: use 'pub' namespace or signup to protect '" <> ns <> "' at https://que.run" -- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist. grab :: (Eq k, Hashable k) => k -> HashMap k v -> v grab = flip (HashMap.!) -- | Inserts the namespace in 'Ques' if it doesn't exist. upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase upsertNamespace ns as = if HashMap.member ns as then as else HashMap.insert ns mempty as -- | Inserts the que at the proper 'Namespace' and 'Quepath'. insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase insertQue ns qp q hm = newQues where newQues = HashMap.insert ns newQbase hm newQbase = HashMap.insert qp q <| grab ns hm -- | housing for a set of que paths type Namespace = Text -- | a que is just a channel of bytes type Que = Go.Channel Message -- | any path can serve as an identifier for a que type Quepath = Text -- | any opaque data type Message = ByteString -- | a collection of ques type Quebase = HashMap Quepath Que -- | Get app state gets :: App Ques gets = ask +> STM.readTVarIO .> liftIO +> pure -- | Apply a function to the app state modify :: (Ques -> Ques) -> App () modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO -- | Lookup or create a que que :: Namespace -> Quepath -> App Que que ns qp = do ques <- gets let qbase = grab ns ques if HashMap.member qp qbase then pure <| grab qp qbase else do c <- liftIO <| Go.chan 5 modify (insertQue ns qp c) gets /> grab ns /> grab qp