1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{- | Interprocess communication
-}
module Run.Que
( main
)
where
import Com.Simatime.Alpha hiding ( Text
, get
, gets
, modify
, poll
)
import qualified Com.Simatime.Go as Go
import qualified Control.Concurrent.STM as STM
import qualified Control.Exception as Exception
import Control.Monad.Reader ( MonadTrans )
import qualified Data.ByteString.Builder.Extra as Builder
import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Lazy ( HashMap )
import qualified Data.HashMap.Lazy as HashMap
import qualified Data.Text.Lazy as Text
import Data.Text.Lazy ( Text
, fromStrict
)
import qualified Data.Text.Encoding as Encoding
import GHC.Base ( String )
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import Network.Wai.Middleware.RequestLogger
( logStdoutDev )
import qualified Web.Scotty.Trans as Scotty
main :: IO ()
main = Exception.bracket startup shutdown run
where
run :: Wai.Application -> IO ()
run waiapp = Warp.run 8081 waiapp
-- | TODO: startup/shutdown ekg server, katip scribes
startup :: IO Wai.Application
startup = do
sync <- STM.newTVarIO <| AppState { ques = HashMap.empty }
let runActionToIO m = runReaderT (runApp m) sync
Scotty.scottyAppT runActionToIO routes
shutdown :: a -> IO a
shutdown = pure . identity
routes :: Scotty.ScottyT Text App ()
routes = do
Scotty.middleware logStdoutDev
-- | Receive a value from a que. Blocks until a value is received,
-- then returns. If 'poll=true', then stream data from the Que to the
-- client.
Scotty.get (Scotty.regex quepath) <| do
(ns, qp) <- extract
-- ensure namespace exists
app . modify <| upsertNamespace ns
q <- app <| que ns qp
poll <- Scotty.param "poll" :: Scotty.ActionT Text App Text
case poll of
"true" -> Scotty.stream $ streamQue q
_ -> do
r <- liftIO <| takeQue q
Scotty.text <| fromStrict <| Encoding.decodeUtf8 r
-- | Put a value on a que. Returns immediately.
Scotty.post (Scotty.regex quepath) <| do
(ns, qp) <- extract
qdata <- Scotty.body
-- ensure namespace exists
app . modify <| upsertNamespace ns
q <- app <| que ns qp
liftIO <| pushQue (BSL.toStrict qdata) q
return ()
-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
streamQue :: Que -> Wai.StreamingBody
streamQue q write _ = Go.mult q >>= loop
where
loop c =
Go.tap c
>>= (write . Builder.byteStringInsert)
>> (write <| Builder.byteStringInsert "\n")
>> loop c
-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)
-- | Inserts the namespace in 'AppState' if it doesn't exist.
upsertNamespace :: Namespace -> AppState -> AppState
upsertNamespace ns as = if HashMap.member ns (ques as)
then as
else as { ques = HashMap.insert ns mempty (ques as) }
-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
insertQue ns qp q as = as { ques = newQues }
where
newQues = HashMap.insert ns newQbase (ques as)
newQbase = HashMap.insert qp q <| grab ns <| ques as
quepath :: GHC.Base.String
quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$"
extract :: Scotty.ActionT Text App (Namespace, Quepath)
extract = do
ns <- Scotty.param "0"
path <- Scotty.param "1"
let p = Text.split (== '/') path |> filter (not . Text.null)
return (ns, p)
newtype App a = App
{ runApp :: ReaderT (STM.TVar AppState) IO a
}
deriving (Applicative, Functor, Monad, MonadIO, MonadReader
(STM.TVar AppState))
data AppState = AppState
{ ques :: HashMap Namespace Quebase
}
-- | A synonym for 'lift' in order to be explicit about when we are
-- operating at the 'App' layer.
app :: MonadTrans t => App a -> t App a
app = lift
-- | Get something from the app state
gets :: (AppState -> b) -> App b
gets f = ask >>= liftIO . STM.readTVarIO >>= return . f
-- | Apply a function to the app state
modify :: (AppState -> AppState) -> App ()
modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f
type Namespace = Text -- ^ housing for a set of que paths
type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
type Quepath = [Text] -- ^ any path can serve as an identifier for a que
type Quedata = ByteString -- ^ any opaque data
type Quebase = HashMap Quepath Que -- ^ a collection of ques
-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
_ques <- gets ques
let qbase = grab ns _ques
queExists = HashMap.member qp qbase
if queExists
then return <| grab qp qbase
else do
c <- liftIO Go.chan
modify (insertQue ns qp c)
gets ques /> grab ns /> grab qp
-- | Put data on the que.
pushQue :: Quedata -> Que -> IO ()
pushQue = flip Go.write
-- | Tap and read from the Que. Tap first because a Que is actually a
-- broadcast channel. This allows for multiconsumer Ques.
takeQue :: Que -> IO Quedata
takeQue ch = Go.mult ch >>= Go.tap
|