summaryrefslogtreecommitdiff
path: root/Biz/Que/Host.hs
blob: 57507ae920fec8a9427985ac64d53616c7b86580 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE NoImplicitPrelude #-}

-- | Interprocess communication
--
-- Prior art:
-- - <https://github.com/jb55/httpipe>
-- - <https://patchbay.pub>
-- - <https://github.com/hargettp/courier>
-- - sorta: <https://ngrok.com/> and <https://localtunnel.github.io/www/>
--
-- : out que-server
--
-- : dep async
-- : dep docopt
-- : dep envy
-- : dep protolude
-- : dep scotty
-- : dep stm
-- : dep tasty
-- : dep tasty-hunit
-- : dep tasty-quickcheck
-- : dep unagi-chan
-- : dep unordered-containers
module Biz.Que.Host
  ( main,
  )
where

import Alpha hiding (gets, modify, poll)
import qualified Biz.Cli as Cli
import Biz.Test ((@=?))
import qualified Biz.Test as Test
import qualified Control.Concurrent.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
import Control.Monad.Reader (MonadTrans)
import qualified Data.ByteString.Builder.Extra as Builder
import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Lazy (HashMap)
import qualified Data.HashMap.Lazy as HashMap
import qualified Data.Text.Encoding as Encoding
import qualified Data.Text.Lazy as Text.Lazy
import qualified Network.HTTP.Types.Status as Http
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import Network.Wai.Middleware.RequestLogger
  ( logStdout,
  )
import qualified System.Envy as Envy
import qualified Web.Scotty.Trans as Scotty
import qualified Prelude

{-# ANN module ("HLint: ignore Reduce duplication" :: Prelude.String) #-}

main :: IO ()
main = Cli.main <| Cli.Plan help move test pure

move :: Cli.Arguments -> IO ()
move _ = Exception.bracket startup shutdown <| uncurry Warp.run
  where
    startup =
      Envy.decodeWithDefaults Envy.defConfig +> \c -> do
        sync <- STM.newTVarIO initialAppState
        let runActionToIO m = runReaderT (runApp m) sync
        waiapp <- Scotty.scottyAppT runActionToIO <| routes c
        putText "*"
        putText "que"
        putText <| "port: " <> (show <| quePort c)
        putText <| "skey: " <> (Text.Lazy.toStrict <| queSkey c)
        return (quePort c, waiapp)
    shutdown :: a -> IO a
    shutdown = pure <. identity

help :: Cli.Docopt
help =
  [Cli.docopt|
que-server

Usage:
  que-server
  que-server test
|]

test :: Test.Tree
test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)]

newtype App a = App
  { runApp :: ReaderT (STM.TVar AppState) IO a
  }
  deriving
    ( Applicative,
      Functor,
      Monad,
      MonadIO,
      MonadReader
        (STM.TVar AppState)
    )

newtype AppState = AppState
  { ques :: HashMap Namespace Quebase
  }

initialAppState :: AppState
initialAppState = AppState {ques = mempty}

data Config = Config
  { -- | QUE_PORT
    quePort :: Warp.Port,
    -- | QUE_SKEY
    queSkey :: Text.Lazy.Text
  }
  deriving (Generic, Show)

instance Envy.DefConfig Config where
  defConfig = Config 3000 "admin-key"

instance Envy.FromEnv Config

routes :: Config -> Scotty.ScottyT Text.Lazy.Text App ()
routes cfg = do
  Scotty.middleware logStdout
  let quepath = "^\\/([[:alnum:]_-]+)\\/([[:alnum:]._/-]*)$"
  let namespace = "^\\/([[:alnum:]_-]+)\\/?$" -- matches '/ns' and '/ns/' but not '/ns/path'

  -- GET /index.html
  Scotty.get (Scotty.literal "/index.html") <| Scotty.redirect "/_/index"
  Scotty.get (Scotty.literal "/") <| Scotty.redirect "/_/index"
  -- GET /_/dash
  Scotty.get (Scotty.literal "/_/dash") <| do
    authkey <- fromMaybe "" </ Scotty.header "Authorization"
    if authkey == (Text.Lazy.strip <| queSkey cfg)
      then do
        d <- app <| gets ques
        Scotty.json d
      else do
        Scotty.status Http.methodNotAllowed405
        Scotty.text "not allowed"
  -- Namespace management
  Scotty.matchAny (Scotty.regex namespace) <| do
    Scotty.status Http.notImplemented501
    Scotty.text "namespace management coming soon"
  -- GET que
  --
  -- Receive a value from a que. Blocks until a value is received,
  -- then returns. If 'poll=true', then stream data from the Que to the
  -- client.
  Scotty.get (Scotty.regex quepath) <| do
    (ns, qp) <- extract
    guardNs ns ["pub", "_"]
    app <. modify <| upsertNamespace ns
    q <- app <| que ns qp
    poll <- Scotty.param "poll" !: (pure <. const False)
    if poll
      then Scotty.stream <| streamQue q
      else do
        r <- liftIO <| Go.read q
        Scotty.html <| fromStrict <| Encoding.decodeUtf8 r
  -- POST que
  --
  -- Put a value on a que. Returns immediately.
  Scotty.post (Scotty.regex quepath) <| do
    authkey <- fromMaybe "" </ Scotty.header "Authorization"
    (ns, qp) <- extract
    -- Only allow my IP or localhost to publish to '_' namespace
    when
      ("_" == ns && authkey /= (Text.Lazy.strip <| queSkey cfg))
      ( Scotty.status Http.methodNotAllowed405
          >> Scotty.text "not allowed: _ is a reserved namespace"
          >> Scotty.finish
      )
    guardNs ns ["pub", "_"]
    -- passed all auth checks
    app <. modify <| upsertNamespace ns
    q <- app <| que ns qp
    qdata <- Scotty.body
    _ <- liftIO <| Go.write q <| BSL.toStrict qdata
    return ()

-- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist`
-- list, return a 405 error.
guardNs :: Text.Lazy.Text -> [Text.Lazy.Text] -> Scotty.ActionT Text.Lazy.Text App ()
guardNs ns whitelist =
  when (not <| ns `elem` whitelist) <| do
    Scotty.status Http.methodNotAllowed405
    Scotty.text
      <| "not allowed: use 'pub' namespace or signup to protect '"
      <> ns
      <> "' at https://que.run"
    Scotty.finish

-- | recover from a scotty-thrown exception.
(!:) ::
  -- | action that might throw
  Scotty.ActionT Text.Lazy.Text App a ->
  -- | a function providing a default response instead
  (Text.Lazy.Text -> Scotty.ActionT Text.Lazy.Text App a) ->
  Scotty.ActionT Text.Lazy.Text App a
(!:) = Scotty.rescue

-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
streamQue :: Que -> Wai.StreamingBody
streamQue q write _ = loop q
  where
    loop c =
      Go.read c
        +> (write <. Builder.byteStringInsert)
        >> loop c

-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)

-- | Inserts the namespace in 'AppState' if it doesn't exist.
upsertNamespace :: Namespace -> AppState -> AppState
upsertNamespace ns as =
  if HashMap.member ns (ques as)
    then as
    else as {ques = HashMap.insert ns mempty (ques as)}

-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
insertQue ns qp q as = as {ques = newQues}
  where
    newQues = HashMap.insert ns newQbase (ques as)
    newQbase = HashMap.insert qp q <| grab ns <| ques as

extract :: Scotty.ActionT Text.Lazy.Text App (Namespace, Quepath)
extract = do
  ns <- Scotty.param "1"
  path <- Scotty.param "2"
  return (ns, path)

-- | A synonym for 'lift' in order to be explicit about when we are
-- operating at the 'App' layer.
app :: MonadTrans t => App a -> t App a
app = lift

-- | Get something from the app state
gets :: (AppState -> b) -> App b
gets f = ask +> liftIO <. STM.readTVarIO +> return </ f

-- | Apply a function to the app state
modify :: (AppState -> AppState) -> App ()
modify f = ask +> liftIO <. atomically <. flip STM.modifyTVar' f

-- | housing for a set of que paths
type Namespace = Text.Lazy.Text

-- | a que is just a channel of bytes
type Que = Go.Channel Message

-- | any path can serve as an identifier for a que
type Quepath = Text

-- | any opaque data
type Message = ByteString

-- | a collection of ques
type Quebase = HashMap Quepath Que

-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
  _ques <- gets ques
  let qbase = grab ns _ques
      queExists = HashMap.member qp qbase
  if queExists
    then return <| grab qp qbase
    else do
      c <- liftIO <| Go.chan 1
      modify (insertQue ns qp c)
      gets ques /> grab ns /> grab qp