1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | Interprocess communication
--
-- Prior art:
-- - <https://github.com/jb55/httpipe>
-- - <https://patchbay.pub>
-- - <https://github.com/hargettp/courier>
-- - sorta: <https://ngrok.com/> and <https://localtunnel.github.io/www/>
--
-- : out que-server
module Biz.Que.Host
( main,
)
where
import Alpha hiding (gets, modify, poll)
import qualified Control.Concurrent.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
import Data.HashMap.Lazy (HashMap)
import qualified Data.HashMap.Lazy as HashMap
import Network.HTTP.Media ((//), (/:))
import qualified Network.Wai.Handler.Warp as Warp
import qualified Omni.Cli as Cli
import qualified Omni.Log as Log
import Omni.Test ((@=?))
import qualified Omni.Test as Test
import Servant
import Servant.Server.Generic (AsServerT, genericServeT)
import qualified Servant.Types.SourceT as Source
import qualified System.Envy as Envy
main :: IO ()
main = Cli.main <| Cli.Plan help move test pure
move :: Cli.Arguments -> IO ()
move _ = Exception.bracket startup shutdown <| uncurry Warp.run
where
startup =
Envy.decodeWithDefaults Envy.defConfig +> \cfg@Config {..} -> do
initialState <- atomically <| STM.newTVar mempty
-- natural transformation
let nt :: AppState -> App a -> Servant.Handler a
nt s x = runReaderT x s
let app :: AppState -> Application
app s = genericServeT (nt s) (paths cfg)
Log.info ["boot", "que"] >> Log.br
Log.info ["boot", "port", show <| quePort] >> Log.br
Log.info ["boot", "skey", queSkey] >> Log.br
pure (quePort, app initialState)
shutdown :: a -> IO a
shutdown = pure <. identity
help :: Cli.Docopt
help =
[Cli.docopt|
que-server
Usage:
que-server
que-server test
|]
test :: Test.Tree
test = Test.group "Biz.Que.Host" [Test.unit "id" <| 1 @=? (1 :: Integer)]
type App = ReaderT AppState Servant.Handler
type Ques = HashMap Namespace Quebase
type AppState = STM.TVar Ques
data Config = Config
{ -- | QUE_PORT
quePort :: Warp.Port,
-- | QUE_SKEY
queSkey :: Text
}
deriving (Generic, Show)
instance Envy.DefConfig Config where
defConfig = Config 3001 "admin-key"
instance Envy.FromEnv Config
-- | A simple HTML type. This recognizes "content-type: text/html" but doesn't
-- do any conversion, rendering, or sanitization like the
-- 'Servant.HTML.Lucid.HTML' type would do.
data HTML deriving (Typeable)
instance Accept HTML where
contentTypes _ = "text" // "html" /: ("charset", "utf-8") :| ["text" // "html"]
instance MimeRender HTML ByteString where
mimeRender _ = str
instance MimeUnrender HTML Text where
mimeUnrender _ bs = Right <| str bs
instance MimeUnrender OctetStream Text where
mimeUnrender _ bs = Right <| str bs
instance MimeRender PlainText ByteString where
mimeRender _ = str
instance MimeUnrender PlainText ByteString where
mimeUnrender _ bs = Right <| str bs
data Paths path = Paths
{ home ::
path
:- Get '[JSON] NoContent,
dash ::
path
:- "_"
:> "dash"
:> Get '[JSON] Ques,
getQue ::
path
:- Capture "ns" Text
:> Capture "quename" Text
:> Get '[PlainText, HTML, OctetStream] Message,
getStream ::
path
:- Capture "ns" Text
:> Capture "quename" Text
:> "stream"
:> StreamGet NoFraming OctetStream (SourceIO Message),
putQue ::
path
:- Capture "ns" Text
:> Capture "quepath" Text
:> ReqBody '[PlainText, HTML, OctetStream] Text
:> Post '[PlainText, HTML, OctetStream] NoContent
}
deriving (Generic)
paths :: Config -> Paths (AsServerT App)
paths _ =
-- TODO revive authkey stuff
-- - read Authorization header, compare with queSkey
-- - Only allow my IP or localhost to publish to '_' namespace
Paths
{ home =
throwError <| err301 {errHeaders = [("Location", "/_/index")]},
dash = gets,
getQue = \ns qn -> do
guardNs ns ["pub", "_"]
modify <| upsertNamespace ns
q <- que ns qn
Go.mult q
|> liftIO
+> Go.tap
|> liftIO,
getStream = \ns qn -> do
guardNs ns ["pub", "_"]
modify <| upsertNamespace ns
q <- que ns qn
Go.mult q
|> liftIO
+> Go.tap
|> Source.fromAction (const False) -- peek chan instead of False?
|> pure,
putQue = \ns qp body -> do
guardNs ns ["pub", "_"]
modify <| upsertNamespace ns
q <- que ns qp
body
|> str
|> Go.write q
>> Go.read q -- flush the que, otherwise msgs never clear
|> liftIO
-- TODO: detect number of readers, respond with "sent to N readers" or
-- "no readers, msg lost"
>> pure NoContent
}
-- | Given `guardNs ns whitelist`, if `ns` is not in the `whitelist`
-- list, return a 405 error.
guardNs :: (Applicative a, MonadError ServerError a) => Text -> [Text] -> a ()
guardNs ns whitelist =
when (not <| ns `elem` whitelist) <| do
throwError <| err405 {errBody = str msg}
where
msg =
"not allowed: use 'pub' namespace or signup to protect '"
<> ns
<> "' at https://que.run"
-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)
-- | Inserts the namespace in 'Ques' if it doesn't exist.
upsertNamespace :: Namespace -> HashMap Namespace Quebase -> HashMap Namespace Quebase
upsertNamespace ns as =
if HashMap.member ns as
then as
else HashMap.insert ns mempty as
-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
insertQue :: Namespace -> Quepath -> Que -> HashMap Namespace Quebase -> HashMap Namespace Quebase
insertQue ns qp q hm = newQues
where
newQues = HashMap.insert ns newQbase hm
newQbase = HashMap.insert qp q <| grab ns hm
-- | housing for a set of que paths
type Namespace = Text
-- | a que is just a channel of bytes
type Que = Go.Channel Message
-- | any path can serve as an identifier for a que
type Quepath = Text
-- | any opaque data
type Message = ByteString
-- | a collection of ques
type Quebase = HashMap Quepath Que
-- | Get app state
gets :: App Ques
gets = ask +> STM.readTVarIO .> liftIO +> pure
-- | Apply a function to the app state
modify :: (Ques -> Ques) -> App ()
modify f = ask +> flip STM.modifyTVar' f .> atomically .> liftIO
-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
ques <- gets
let qbase = grab ns ques
if HashMap.member qp qbase
then pure <| grab qp qbase
else do
c <- liftIO <| Go.chan 5
modify (insertQue ns qp c)
gets /> grab ns /> grab qp
|