1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
module Network.Wai.Middleware.Braid
( -- * Types
Update,
Topic,
-- * Method helpers
isGetRequest,
isPutRequest,
isPatchRequest,
-- * 209 Status variable
status209,
-- * Header helpers & variables
hSub,
hVer,
hMerge,
hParents,
hPatch,
lookupHeader,
getSubscription,
hasSubscription,
getSubscriptionKeepAliveTime,
addSubscriptionHeader,
getVersion,
hasVersion,
addVersionHeader,
getMergeType,
hasMergeType,
addMergeTypeHeader,
getParents,
hasParents,
getPatches,
hasPatches,
-- * Update helpers
requestToUpdate,
updateToBuilder,
-- * Middleware
braidify,
subscriptionMiddleware,
versionMiddleware,
addPatchHeader,
-- * Subscription helper
streamUpdates,
)
where
import Alpha
import qualified Data.ByteString as B
import Data.ByteString.Builder (Builder, byteString)
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as L
import qualified Data.CaseInsensitive as CI
import Network.HTTP.Types.Header (Header, HeaderName, RequestHeaders)
import Network.HTTP.Types.Method (methodGet, methodPatch, methodPut)
import Network.HTTP.Types.Status (Status, mkStatus)
import qualified Network.Wai as Wai
import Network.Wai.Middleware.AddHeaders (addHeaders)
type Topic = [Text]
data Update
= -- | Updates are streamed from the server to subcribing client.
-- On a PUT request, the headers and request body are put into an Update and streamed to subscribing clients.
Update
{ -- | The updateTopic is formed, from the request path
updateTopic :: [Text],
-- | The updateClient is an id generated by the client to prevent echo updates
-- https://github.com/braid-work/braid-spec/issues/72
updateClient :: Maybe B.ByteString,
-- | The updateHeader are taken straight from the request headers
updateHeaders :: RequestHeaders,
-- | The updatePatches correspond to the request body
updatePatches :: L.ByteString
}
isGetRequest, isPutRequest, isPatchRequest :: Wai.Request -> Bool
isGetRequest req = Wai.requestMethod req == methodGet
isPutRequest req = Wai.requestMethod req == methodPut
isPatchRequest req = Wai.requestMethod req == methodPatch
-- | 209 Subscription is the new status code for subscriptions in braid
status209 :: Status
status209 = mkStatus 209 "Subscription"
lookupHeader :: HeaderName -> [Header] -> Maybe B.ByteString
lookupHeader _ [] = Nothing
lookupHeader v ((n, s) : t)
| v == n = Just s
| otherwise = lookupHeader v t
hSub :: HeaderName
hSub = "Subscribe"
getSubscription :: Wai.Request -> Maybe B.ByteString
getSubscription req = lookupHeader hSub <| Wai.requestHeaders req
getSubscriptionKeepAliveTime :: Wai.Request -> B.ByteString
getSubscriptionKeepAliveTime req =
let Just s = lookupHeader hSub <| Wai.requestHeaders req
in snd <| BC.breakSubstring "=" s
hasSubscription :: Wai.Request -> Bool
hasSubscription req = isJust <| getSubscription req
addSubscriptionHeader :: B.ByteString -> Wai.Response -> Wai.Response
addSubscriptionHeader s =
Wai.mapResponseHeaders
(\hs -> (hSub, s) : ("Cache-Control", "no-cache, no-transform") : hs)
hVer :: HeaderName
hVer = "Version"
getVersion :: Wai.Request -> Maybe B.ByteString
getVersion req = lookupHeader hVer <| Wai.requestHeaders req
hasVersion :: Wai.Request -> Bool
hasVersion req = isJust <| getVersion req
addVersionHeader :: B.ByteString -> Wai.Response -> Wai.Response
addVersionHeader s = Wai.mapResponseHeaders (\hs -> (hVer, s) : hs)
hMerge :: HeaderName
hMerge = "Merge-Type"
getMergeType :: Wai.Request -> Maybe B.ByteString
getMergeType req = lookupHeader hMerge <| Wai.requestHeaders req
hasMergeType :: Wai.Request -> Bool
hasMergeType req = isJust <| getMergeType req
addMergeTypeHeader :: B.ByteString -> Wai.Response -> Wai.Response
addMergeTypeHeader s = Wai.mapResponseHeaders (\hs -> (hMerge, s) : hs)
hParents :: HeaderName
hParents = "Parents"
getParents :: Wai.Request -> Maybe B.ByteString
getParents req = lookupHeader hParents <| Wai.requestHeaders req
hasParents :: Wai.Request -> Bool
hasParents req = isJust <| getParents req
hPatch :: HeaderName
hPatch = "Patches"
getPatches :: Wai.Request -> Maybe B.ByteString
getPatches req = lookupHeader hPatch <| Wai.requestHeaders req
hasPatches :: Wai.Request -> Bool
hasPatches req = isJust <| getPatches req
-- | Forms an Update from a WAI Request
requestToUpdate :: Wai.Request -> L.ByteString -> Update
requestToUpdate req body =
Update
{ updateTopic = Wai.pathInfo req,
updateClient = lookupHeader "Client" reqHeaders,
updateHeaders =
[ (x, y)
| (x, y) <- reqHeaders,
x `elem` [hSub, hVer, hMerge, hParents, hPatch, "Content-Type"]
],
updatePatches = body
}
where
reqHeaders = Wai.requestHeaders req
separator :: B.ByteString
separator = BC.pack ": "
-- | Turns an Update (headers and patches) into a Builder to be streamed
-- Will return Nothing if the Topic we pass doesn't not match the updateTopic in the Update
-- Or returns Just builder, where builder has type Builder
updateToBuilder :: Topic -> Maybe B.ByteString -> Update -> Maybe Builder
updateToBuilder topic client (Update t c h p)
| t /= topic && c == client = Nothing
| otherwise = Just <| builder h p
where
builder :: RequestHeaders -> L.ByteString -> Builder
builder hs b =
hs
|> map (\(h_, v) -> CI.original h_ <> separator <> v)
|> B.intercalate "\n"
|> (\headers -> headers <> "\n\n" <> L.toStrict b)
|> byteString
-- TODO: still needs mechanism to keep alive, i.e. keeping the response connection open
subscriptionMiddleware :: Chan Update -> Wai.Middleware
subscriptionMiddleware src = catchUpdate src <. modifyHeadersToSub <. modifyStatusTo209
where
modifyHeadersToSub :: Wai.Middleware
modifyHeadersToSub app req respond =
case getSubscription req of
Just v -> app req <| respond <. addSubscriptionHeader v
Nothing -> app req respond
modifyStatusTo209 :: Wai.Middleware
modifyStatusTo209 = Wai.ifRequest hasSubscription <| Wai.modifyResponse <| Wai.mapResponseStatus (const status209)
-- NOTE: we're consuming the full request body, maybe there's a better way of doing this? idk
catchUpdate :: Chan Update -> Wai.Middleware
catchUpdate src_ =
Wai.ifRequest isPutRequest <| \app req res -> do
src' <- liftIO <| dupChan src_
Wai.strictRequestBody req +> \b ->
writeChan src' <| requestToUpdate req b
app req res
versionMiddleware :: Wai.Middleware
versionMiddleware app req respond =
case (getVersion req, isGetRequest req) of
(Just v, True) -> app req <| respond <. addVersionHeader v
_ -> app req respond
addPatchHeader :: Wai.Middleware
addPatchHeader = Wai.ifRequest isPutRequest <| addHeaders [("Patches", "OK")]
-- |
-- TODO: look into Chan vs BroadcastChan (https://github.com/merijn/broadcast-chan)
streamUpdates :: Chan Update -> Topic -> Maybe ByteString -> Wai.StreamingBody
streamUpdates chan topic client write flush = do
flush
src <- liftIO <| dupChan chan
fix <| \loop -> do
update <- readChan src
case updateToBuilder topic client update of
Just b -> write b >> flush >> loop
Nothing -> loop
braidify :: Chan Update -> Wai.Middleware
braidify src =
subscriptionMiddleware src
<. versionMiddleware
<. addPatchHeader
<. addHeaders [("Range-Request-Allow-Methods", "PATCH, PUT"), ("Range-Request-Allow-Units", "json")]
|