summaryrefslogtreecommitdiff
path: root/Network
diff options
context:
space:
mode:
authorBen Sima <ben@bsima.me>2021-08-04 11:09:35 -0400
committerBen Sima <ben@bsima.me>2021-11-26 13:47:37 -0500
commit9b1df01fd2cf3ecf41fc68b94051db665821c774 (patch)
treec1a3e68f625679576ff7e47bd1ebcb07bb94e15e /Network
parent0264f4a5dc37b16f872e6fa92bd8f1fc1e2b1826 (diff)
Reimplement Que with Servant
Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway.
Diffstat (limited to 'Network')
-rw-r--r--Network/Wai/Middleware/Braid.hs239
1 files changed, 239 insertions, 0 deletions
diff --git a/Network/Wai/Middleware/Braid.hs b/Network/Wai/Middleware/Braid.hs
new file mode 100644
index 0000000..f9832ac
--- /dev/null
+++ b/Network/Wai/Middleware/Braid.hs
@@ -0,0 +1,239 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+module Network.Wai.Middleware.Braid
+ ( -- * Types
+ Update,
+ Topic,
+
+ -- * Method helpers
+ isGetRequest,
+ isPutRequest,
+ isPatchRequest,
+
+ -- * 209 Status variable
+ status209,
+
+ -- * Header helpers & variables
+ hSub,
+ hVer,
+ hMerge,
+ hParents,
+ hPatch,
+ lookupHeader,
+ getSubscription,
+ hasSubscription,
+ getSubscriptionKeepAliveTime,
+ addSubscriptionHeader,
+ getVersion,
+ hasVersion,
+ addVersionHeader,
+ getMergeType,
+ hasMergeType,
+ addMergeTypeHeader,
+ getParents,
+ hasParents,
+ getPatches,
+ hasPatches,
+
+ -- * Update helpers
+ requestToUpdate,
+ updateToBuilder,
+
+ -- * Middleware
+ braidify,
+ subscriptionMiddleware,
+ versionMiddleware,
+ addPatchHeader,
+
+ -- * Subscription helper
+ streamUpdates,
+ )
+where
+
+import Alpha
+import qualified Data.ByteString as B
+import Data.ByteString.Builder (Builder, byteString)
+import qualified Data.ByteString.Char8 as BC
+import qualified Data.ByteString.Lazy as L
+import qualified Data.CaseInsensitive as CI
+import Network.HTTP.Types.Header (Header, HeaderName, RequestHeaders)
+import Network.HTTP.Types.Method (methodGet, methodPatch, methodPut)
+import Network.HTTP.Types.Status (Status, mkStatus)
+import qualified Network.Wai as Wai
+import Network.Wai.Middleware.AddHeaders (addHeaders)
+
+type Topic = [Text]
+
+data Update = -- | Updates are streamed from the server to subcribing client.
+ -- On a PUT request, the headers and request body are put into an Update and streamed to subscribing clients.
+ Update
+ { -- | The updateTopic is formed, from the request path
+ updateTopic :: [Text],
+ -- | The updateClient is an id generated by the client to prevent echo updates
+ -- https://github.com/braid-work/braid-spec/issues/72
+ updateClient :: Maybe B.ByteString,
+ -- | The updateHeader are taken straight from the request headers
+ updateHeaders :: RequestHeaders,
+ -- | The updatePatches correspond to the request body
+ updatePatches :: L.ByteString
+ }
+
+isGetRequest, isPutRequest, isPatchRequest :: Wai.Request -> Bool
+isGetRequest req = Wai.requestMethod req == methodGet
+isPutRequest req = Wai.requestMethod req == methodPut
+isPatchRequest req = Wai.requestMethod req == methodPatch
+
+-- | 209 Subscription is the new status code for subscriptions in braid
+status209 :: Status
+status209 = mkStatus 209 "Subscription"
+
+lookupHeader :: HeaderName -> [Header] -> Maybe B.ByteString
+lookupHeader _ [] = Nothing
+lookupHeader v ((n, s) : t)
+ | v == n = Just s
+ | otherwise = lookupHeader v t
+
+hSub :: HeaderName
+hSub = "Subscribe"
+
+getSubscription :: Wai.Request -> Maybe B.ByteString
+getSubscription req = lookupHeader hSub <| Wai.requestHeaders req
+
+getSubscriptionKeepAliveTime :: Wai.Request -> B.ByteString
+getSubscriptionKeepAliveTime req =
+ let Just s = lookupHeader hSub <| Wai.requestHeaders req
+ in snd <| BC.breakSubstring "=" s
+
+hasSubscription :: Wai.Request -> Bool
+hasSubscription req = isJust <| getSubscription req
+
+addSubscriptionHeader :: B.ByteString -> Wai.Response -> Wai.Response
+addSubscriptionHeader s =
+ Wai.mapResponseHeaders
+ (\hs -> (hSub, s) : ("Cache-Control", "no-cache, no-transform") : hs)
+
+hVer :: HeaderName
+hVer = "Version"
+
+getVersion :: Wai.Request -> Maybe B.ByteString
+getVersion req = lookupHeader hVer <| Wai.requestHeaders req
+
+hasVersion :: Wai.Request -> Bool
+hasVersion req = isJust <| getVersion req
+
+addVersionHeader :: B.ByteString -> Wai.Response -> Wai.Response
+addVersionHeader s = Wai.mapResponseHeaders (\hs -> (hVer, s) : hs)
+
+hMerge :: HeaderName
+hMerge = "Merge-Type"
+
+getMergeType :: Wai.Request -> Maybe B.ByteString
+getMergeType req = lookupHeader hMerge <| Wai.requestHeaders req
+
+hasMergeType :: Wai.Request -> Bool
+hasMergeType req = isJust <| getMergeType req
+
+addMergeTypeHeader :: B.ByteString -> Wai.Response -> Wai.Response
+addMergeTypeHeader s = Wai.mapResponseHeaders (\hs -> (hMerge, s) : hs)
+
+hParents :: HeaderName
+hParents = "Parents"
+
+getParents :: Wai.Request -> Maybe B.ByteString
+getParents req = lookupHeader hParents <| Wai.requestHeaders req
+
+hasParents :: Wai.Request -> Bool
+hasParents req = isJust <| getParents req
+
+hPatch :: HeaderName
+hPatch = "Patches"
+
+getPatches :: Wai.Request -> Maybe B.ByteString
+getPatches req = lookupHeader hPatch <| Wai.requestHeaders req
+
+hasPatches :: Wai.Request -> Bool
+hasPatches req = isJust <| getPatches req
+
+-- | Forms an Update from a WAI Request
+requestToUpdate :: Wai.Request -> L.ByteString -> Update
+requestToUpdate req body =
+ Update
+ { updateTopic = Wai.pathInfo req,
+ updateClient = lookupHeader "Client" reqHeaders,
+ updateHeaders =
+ [ (x, y)
+ | (x, y) <- reqHeaders,
+ x `elem` [hSub, hVer, hMerge, hParents, hPatch, "Content-Type"]
+ ],
+ updatePatches = body
+ }
+ where
+ reqHeaders = Wai.requestHeaders req
+
+separator :: B.ByteString
+separator = BC.pack ": "
+
+-- | Turns an Update (headers and patches) into a Builder to be streamed
+-- Will return Nothing if the Topic we pass doesn't not match the updateTopic in the Update
+-- Or returns Just builder, where builder has type Builder
+updateToBuilder :: Topic -> Maybe B.ByteString -> Update -> Maybe Builder
+updateToBuilder topic client (Update t c h p)
+ | t /= topic && c == client = Nothing
+ | otherwise = Just <| builder h p
+ where
+ builder :: RequestHeaders -> L.ByteString -> Builder
+ builder hs b =
+ hs
+ |> map (\(h_, v) -> CI.original h_ <> separator <> v)
+ |> B.intercalate "\n"
+ |> (\headers -> headers <> "\n\n" <> L.toStrict b)
+ |> byteString
+
+-- TODO: still needs mechanism to keep alive, i.e. keeping the response connection open
+subscriptionMiddleware :: Chan Update -> Wai.Middleware
+subscriptionMiddleware src = catchUpdate src <. modifyHeadersToSub <. modifyStatusTo209
+ where
+ modifyHeadersToSub :: Wai.Middleware
+ modifyHeadersToSub app req respond =
+ case getSubscription req of
+ Just v -> app req <| respond <. addSubscriptionHeader v
+ Nothing -> app req respond
+ modifyStatusTo209 :: Wai.Middleware
+ modifyStatusTo209 = Wai.ifRequest hasSubscription <| Wai.modifyResponse <| Wai.mapResponseStatus (const status209)
+ -- NOTE: we're consuming the full request body, maybe there's a better way of doing this? idk
+ catchUpdate :: Chan Update -> Wai.Middleware
+ catchUpdate src_ =
+ Wai.ifRequest isPutRequest <| \app req res -> do
+ src' <- liftIO <| dupChan src_
+ Wai.strictRequestBody req +> \b ->
+ writeChan src' <| requestToUpdate req b
+ app req res
+
+versionMiddleware :: Wai.Middleware
+versionMiddleware app req respond =
+ case (getVersion req, isGetRequest req) of
+ (Just v, True) -> app req <| respond <. addVersionHeader v
+ _ -> app req respond
+
+addPatchHeader :: Wai.Middleware
+addPatchHeader = Wai.ifRequest isPutRequest <| addHeaders [("Patches", "OK")]
+
+-- |
+-- TODO: look into Chan vs BroadcastChan (https://github.com/merijn/broadcast-chan)
+streamUpdates :: Chan Update -> Topic -> Maybe ByteString -> Wai.StreamingBody
+streamUpdates chan topic client write flush = do
+ flush
+ src <- liftIO <| dupChan chan
+ fix <| \loop -> do
+ update <- readChan src
+ case updateToBuilder topic client update of
+ Just b -> write b >> flush >> loop
+ Nothing -> loop
+
+braidify :: Chan Update -> Wai.Middleware
+braidify src =
+ subscriptionMiddleware src
+ <. versionMiddleware
+ <. addPatchHeader
+ <. addHeaders [("Range-Request-Allow-Methods", "PATCH, PUT"), ("Range-Request-Allow-Units", "json")]