From 9b1df01fd2cf3ecf41fc68b94051db665821c774 Mon Sep 17 00:00:00 2001 From: Ben Sima Date: Wed, 4 Aug 2021 11:09:35 -0400 Subject: Reimplement Que with Servant Still todo: add authentication. But that can wait. In re-implementing this, I was able to figure out how to get the Go.mult working properly as well. The problem is that a tap from a mult channel does not remove the message from the original channel. I'm not sure if that should be a core feature or not; for now I'm just draining the channel when it's received in the Que HTTP handler. (Also, this would be a good place to put persistence: have a background job read from the original channel, and write the msg to disk via acid-state; this would obviate the need for a flush to nowhere.) Also, streaming is working now. The problem was that Scotty closes the connection after it sees a newline in the body, or something, so streaming over Scotty doesn't actually work. It's fine, Servant is better anyway. --- Network/Wai/Middleware/Braid.hs | 239 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 Network/Wai/Middleware/Braid.hs (limited to 'Network/Wai') diff --git a/Network/Wai/Middleware/Braid.hs b/Network/Wai/Middleware/Braid.hs new file mode 100644 index 0000000..f9832ac --- /dev/null +++ b/Network/Wai/Middleware/Braid.hs @@ -0,0 +1,239 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NoImplicitPrelude #-} + +module Network.Wai.Middleware.Braid + ( -- * Types + Update, + Topic, + + -- * Method helpers + isGetRequest, + isPutRequest, + isPatchRequest, + + -- * 209 Status variable + status209, + + -- * Header helpers & variables + hSub, + hVer, + hMerge, + hParents, + hPatch, + lookupHeader, + getSubscription, + hasSubscription, + getSubscriptionKeepAliveTime, + addSubscriptionHeader, + getVersion, + hasVersion, + addVersionHeader, + getMergeType, + hasMergeType, + addMergeTypeHeader, + getParents, + hasParents, + getPatches, + hasPatches, + + -- * Update helpers + requestToUpdate, + updateToBuilder, + + -- * Middleware + braidify, + subscriptionMiddleware, + versionMiddleware, + addPatchHeader, + + -- * Subscription helper + streamUpdates, + ) +where + +import Alpha +import qualified Data.ByteString as B +import Data.ByteString.Builder (Builder, byteString) +import qualified Data.ByteString.Char8 as BC +import qualified Data.ByteString.Lazy as L +import qualified Data.CaseInsensitive as CI +import Network.HTTP.Types.Header (Header, HeaderName, RequestHeaders) +import Network.HTTP.Types.Method (methodGet, methodPatch, methodPut) +import Network.HTTP.Types.Status (Status, mkStatus) +import qualified Network.Wai as Wai +import Network.Wai.Middleware.AddHeaders (addHeaders) + +type Topic = [Text] + +data Update = -- | Updates are streamed from the server to subcribing client. + -- On a PUT request, the headers and request body are put into an Update and streamed to subscribing clients. + Update + { -- | The updateTopic is formed, from the request path + updateTopic :: [Text], + -- | The updateClient is an id generated by the client to prevent echo updates + -- https://github.com/braid-work/braid-spec/issues/72 + updateClient :: Maybe B.ByteString, + -- | The updateHeader are taken straight from the request headers + updateHeaders :: RequestHeaders, + -- | The updatePatches correspond to the request body + updatePatches :: L.ByteString + } + +isGetRequest, isPutRequest, isPatchRequest :: Wai.Request -> Bool +isGetRequest req = Wai.requestMethod req == methodGet +isPutRequest req = Wai.requestMethod req == methodPut +isPatchRequest req = Wai.requestMethod req == methodPatch + +-- | 209 Subscription is the new status code for subscriptions in braid +status209 :: Status +status209 = mkStatus 209 "Subscription" + +lookupHeader :: HeaderName -> [Header] -> Maybe B.ByteString +lookupHeader _ [] = Nothing +lookupHeader v ((n, s) : t) + | v == n = Just s + | otherwise = lookupHeader v t + +hSub :: HeaderName +hSub = "Subscribe" + +getSubscription :: Wai.Request -> Maybe B.ByteString +getSubscription req = lookupHeader hSub <| Wai.requestHeaders req + +getSubscriptionKeepAliveTime :: Wai.Request -> B.ByteString +getSubscriptionKeepAliveTime req = + let Just s = lookupHeader hSub <| Wai.requestHeaders req + in snd <| BC.breakSubstring "=" s + +hasSubscription :: Wai.Request -> Bool +hasSubscription req = isJust <| getSubscription req + +addSubscriptionHeader :: B.ByteString -> Wai.Response -> Wai.Response +addSubscriptionHeader s = + Wai.mapResponseHeaders + (\hs -> (hSub, s) : ("Cache-Control", "no-cache, no-transform") : hs) + +hVer :: HeaderName +hVer = "Version" + +getVersion :: Wai.Request -> Maybe B.ByteString +getVersion req = lookupHeader hVer <| Wai.requestHeaders req + +hasVersion :: Wai.Request -> Bool +hasVersion req = isJust <| getVersion req + +addVersionHeader :: B.ByteString -> Wai.Response -> Wai.Response +addVersionHeader s = Wai.mapResponseHeaders (\hs -> (hVer, s) : hs) + +hMerge :: HeaderName +hMerge = "Merge-Type" + +getMergeType :: Wai.Request -> Maybe B.ByteString +getMergeType req = lookupHeader hMerge <| Wai.requestHeaders req + +hasMergeType :: Wai.Request -> Bool +hasMergeType req = isJust <| getMergeType req + +addMergeTypeHeader :: B.ByteString -> Wai.Response -> Wai.Response +addMergeTypeHeader s = Wai.mapResponseHeaders (\hs -> (hMerge, s) : hs) + +hParents :: HeaderName +hParents = "Parents" + +getParents :: Wai.Request -> Maybe B.ByteString +getParents req = lookupHeader hParents <| Wai.requestHeaders req + +hasParents :: Wai.Request -> Bool +hasParents req = isJust <| getParents req + +hPatch :: HeaderName +hPatch = "Patches" + +getPatches :: Wai.Request -> Maybe B.ByteString +getPatches req = lookupHeader hPatch <| Wai.requestHeaders req + +hasPatches :: Wai.Request -> Bool +hasPatches req = isJust <| getPatches req + +-- | Forms an Update from a WAI Request +requestToUpdate :: Wai.Request -> L.ByteString -> Update +requestToUpdate req body = + Update + { updateTopic = Wai.pathInfo req, + updateClient = lookupHeader "Client" reqHeaders, + updateHeaders = + [ (x, y) + | (x, y) <- reqHeaders, + x `elem` [hSub, hVer, hMerge, hParents, hPatch, "Content-Type"] + ], + updatePatches = body + } + where + reqHeaders = Wai.requestHeaders req + +separator :: B.ByteString +separator = BC.pack ": " + +-- | Turns an Update (headers and patches) into a Builder to be streamed +-- Will return Nothing if the Topic we pass doesn't not match the updateTopic in the Update +-- Or returns Just builder, where builder has type Builder +updateToBuilder :: Topic -> Maybe B.ByteString -> Update -> Maybe Builder +updateToBuilder topic client (Update t c h p) + | t /= topic && c == client = Nothing + | otherwise = Just <| builder h p + where + builder :: RequestHeaders -> L.ByteString -> Builder + builder hs b = + hs + |> map (\(h_, v) -> CI.original h_ <> separator <> v) + |> B.intercalate "\n" + |> (\headers -> headers <> "\n\n" <> L.toStrict b) + |> byteString + +-- TODO: still needs mechanism to keep alive, i.e. keeping the response connection open +subscriptionMiddleware :: Chan Update -> Wai.Middleware +subscriptionMiddleware src = catchUpdate src <. modifyHeadersToSub <. modifyStatusTo209 + where + modifyHeadersToSub :: Wai.Middleware + modifyHeadersToSub app req respond = + case getSubscription req of + Just v -> app req <| respond <. addSubscriptionHeader v + Nothing -> app req respond + modifyStatusTo209 :: Wai.Middleware + modifyStatusTo209 = Wai.ifRequest hasSubscription <| Wai.modifyResponse <| Wai.mapResponseStatus (const status209) + -- NOTE: we're consuming the full request body, maybe there's a better way of doing this? idk + catchUpdate :: Chan Update -> Wai.Middleware + catchUpdate src_ = + Wai.ifRequest isPutRequest <| \app req res -> do + src' <- liftIO <| dupChan src_ + Wai.strictRequestBody req +> \b -> + writeChan src' <| requestToUpdate req b + app req res + +versionMiddleware :: Wai.Middleware +versionMiddleware app req respond = + case (getVersion req, isGetRequest req) of + (Just v, True) -> app req <| respond <. addVersionHeader v + _ -> app req respond + +addPatchHeader :: Wai.Middleware +addPatchHeader = Wai.ifRequest isPutRequest <| addHeaders [("Patches", "OK")] + +-- | +-- TODO: look into Chan vs BroadcastChan (https://github.com/merijn/broadcast-chan) +streamUpdates :: Chan Update -> Topic -> Maybe ByteString -> Wai.StreamingBody +streamUpdates chan topic client write flush = do + flush + src <- liftIO <| dupChan chan + fix <| \loop -> do + update <- readChan src + case updateToBuilder topic client update of + Just b -> write b >> flush >> loop + Nothing -> loop + +braidify :: Chan Update -> Wai.Middleware +braidify src = + subscriptionMiddleware src + <. versionMiddleware + <. addPatchHeader + <. addHeaders [("Range-Request-Allow-Methods", "PATCH, PUT"), ("Range-Request-Allow-Units", "json")] -- cgit v1.2.3