summaryrefslogtreecommitdiff
path: root/Run/Que.hs
blob: aee021deecb829466005a2fff90f8824ed92eefe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}

{- | Interprocess communication
-}
module Run.Que
  ( main
  )
where

import           Com.Simatime.Alpha      hiding ( Text
                                                , get
                                                , gets
                                                , modify
                                                , poll
                                                )
import qualified Com.Simatime.Go               as Go
import qualified Control.Concurrent.STM        as STM
import qualified Control.Exception             as Exception
import           Control.Monad.Reader           ( MonadTrans )
import qualified Data.ByteString.Builder.Extra as Builder
import qualified Data.ByteString.Lazy          as BSL
import           Data.HashMap.Lazy              ( HashMap )
import qualified Data.HashMap.Lazy             as HashMap
import qualified Data.Text.Encoding            as Encoding
import           Data.Text.Lazy                 ( Text
                                                , fromStrict
                                                )
import qualified Data.Text.Lazy                as Text
import qualified Network.HTTP.Types.Status     as Http
import qualified Network.Wai                   as Wai
import qualified Network.Wai.Handler.Warp      as Warp
import           Network.Wai.Middleware.RequestLogger
                                                ( logStdoutDev )
import qualified System.Console.GetOpt         as Opt
import qualified System.Environment            as Environment
import qualified Web.Scotty.Trans              as Scotty

main :: IO ()
main = Exception.bracket startup shutdown run
 where
  run (p, waiapp) =
    putText ("que-server starting on port " <> show p) >> Warp.run p waiapp
  startup = do
    opts <- Environment.getArgs /> getOpts
    sync <- STM.newTVarIO opts
    let runActionToIO m = runReaderT (runApp m) sync
    waiapp <- Scotty.scottyAppT runActionToIO routes
    return (port opts, waiapp)
  shutdown :: a -> IO a
  shutdown = pure . identity
  getOpts args = case Opt.getOpt Opt.Permute options args of
    ([]  , [], _) -> Exception.throw ErrorParsingOptions
    (opts, _ , _) -> smoosh initialAppState opts
  options =
    [ Opt.Option
        ['p']
        ["port"]
        (Opt.ReqArg (\n m -> m { port = read n :: Warp.Port }) "PORT")
        "port to run on "
    ]

data Error = ErrorParsingOptions
  deriving (Show)

instance Exception.Exception Error

routes :: Scotty.ScottyT Text App ()
routes = do
  Scotty.middleware logStdoutDev

  let quepath = "^/([[:alnum:]_]*)/([[:alnum:]_/]*)$"

  Scotty.matchAny (Scotty.regex "^/([[:alnum:]_]*)/?$") <| do
    -- matches '/ns' and '/ns/' but not '/ns/path'
    Scotty.status Http.notImplemented501
    Scotty.text "namespace management coming soon"

  -- | Receive a value from a que. Blocks until a value is received,
  -- then returns. If 'poll=true', then stream data from the Que to the
  -- client.
  Scotty.get (Scotty.regex quepath) <| do
    (ns, qp) <- extract
    -- ensure namespace exists
    app . modify <| upsertNamespace ns
    q    <- app <| que ns qp
    poll <- Scotty.param "poll" !: (pure . const False)
    case poll of
      True -> Scotty.stream $ streamQue q
      _    -> do
        r <- liftIO <| takeQue q
        Scotty.text <| fromStrict <| Encoding.decodeUtf8 r

  -- | Put a value on a que. Returns immediately.
  Scotty.post (Scotty.regex quepath) <| do
    (ns, qp) <- extract
    -- ensure namespace exists
    app . modify <| upsertNamespace ns
    q     <- app <| que ns qp
    qdata <- Scotty.body
    liftIO <| pushQue (BSL.toStrict qdata) q
    return ()

-- | recover from a scotty-thrown exception.
(!:)
  :: Scotty.ActionT Text App a -- ^ action that might throw
  -> (Text -> Scotty.ActionT Text App a) -- ^ a function providing a default response instead
  -> Scotty.ActionT Text App a
(!:) = Scotty.rescue

-- | Forever write the data from 'Que' to 'Wai.StreamingBody'.
streamQue :: Que -> Wai.StreamingBody
streamQue q write _ = Go.mult q >>= loop
 where
  loop c =
    Go.tap c
      >>= (write . Builder.byteStringInsert)
      >>  (write <| Builder.byteStringInsert "\n")
      >>  loop c

-- | Gets the thing from the Hashmap. Call's 'error' if key doesn't exist.
grab :: (Eq k, Hashable k) => k -> HashMap k v -> v
grab = flip (HashMap.!)

-- | Inserts the namespace in 'AppState' if it doesn't exist.
upsertNamespace :: Namespace -> AppState -> AppState
upsertNamespace ns as = if HashMap.member ns (ques as)
  then as
  else as { ques = HashMap.insert ns mempty (ques as) }

-- | Inserts the que at the proper 'Namespace' and 'Quepath'.
insertQue :: Namespace -> Quepath -> Que -> AppState -> AppState
insertQue ns qp q as = as { ques = newQues }
 where
  newQues  = HashMap.insert ns newQbase (ques as)
  newQbase = HashMap.insert qp q <| grab ns <| ques as

extract :: Scotty.ActionT Text App (Namespace, Quepath)
extract = do
  ns   <- Scotty.param "0"
  path <- Scotty.param "1"
  let p = Text.split (== '/') path |> filter (not . Text.null)
  return (ns, p)

newtype App a = App
  { runApp :: ReaderT (STM.TVar AppState) IO a
  }
  deriving (Applicative, Functor, Monad, MonadIO, MonadReader
    (STM.TVar AppState))

data AppState = AppState
  { ques :: HashMap Namespace Quebase
  , port :: Warp.Port
  }

initialAppState :: AppState
initialAppState = AppState { port = 80, ques = mempty }

-- | Resolve a list of 'AppState' transitions into one.
smoosh
  :: AppState -- ^ Initial app state to start with
  -> [AppState -> AppState] -- ^ List of functions to apply in order
  -> AppState
smoosh = foldr identity
-- there's gotta be a standard name for this

-- | A synonym for 'lift' in order to be explicit about when we are
-- operating at the 'App' layer.
app :: MonadTrans t => App a -> t App a
app = lift

-- | Get something from the app state
gets :: (AppState -> b) -> App b
gets f = ask >>= liftIO . STM.readTVarIO >>= return . f

-- | Apply a function to the app state
modify :: (AppState -> AppState) -> App ()
modify f = ask >>= liftIO . atomically . flip STM.modifyTVar' f

type Namespace = Text -- ^ housing for a set of que paths
type Que = Go.Channel Quedata -- ^ a que is just a channel of bytes
type Quepath = [Text] -- ^ any path can serve as an identifier for a que
type Quedata = ByteString -- ^ any opaque data
type Quebase = HashMap Quepath Que -- ^ a collection of ques

-- | Lookup or create a que
que :: Namespace -> Quepath -> App Que
que ns qp = do
  _ques <- gets ques
  let qbase     = grab ns _ques
      queExists = HashMap.member qp qbase
  if queExists
    then return <| grab qp qbase
    else do
      c <- liftIO Go.chan
      modify (insertQue ns qp c)
      gets ques /> grab ns /> grab qp

-- | Put data on the que.
pushQue :: Quedata -> Que -> IO ()
pushQue = flip Go.write

-- | Tap and read from the Que. Tap first because a Que is actually a
-- broadcast channel. This allows for multiconsumer Ques.
takeQue :: Que -> IO Quedata
takeQue ch = Go.mult ch >>= Go.tap