1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
-- | An EDSL to make working with concurrent in-process code a bit easier
-- to read.
--
-- This module is expected to be imported qualified as `Go`. Inspired by
-- Golang and Clojure's core.async.
--
-- \$example
module Control.Concurrent.Go
( -- * Running and forking
fork,
-- * Channels
Channel,
chan,
read,
write,
mult,
tap,
)
where
import Alpha
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded as Chan
import qualified Data.Aeson as Aeson
import Data.Text (Text)
import qualified System.IO.Unsafe as Unsafe
-- | A standard channel.
data Channel a
= Channel
{ _in :: Chan.InChan a,
_out :: Chan.OutChan a,
_size :: Int
}
instance Aeson.ToJSON (Channel a) where
toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
where
len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in
-- | Starts a background process.
fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO
-- | Make a new channel.
chan :: Int -> IO (Channel a)
chan n = do
(i, o) <- Chan.newChan n
return $ Channel i o n
-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a
-- | Duplicates a channel, but then anything written to the source will
-- be available to both. This is like Clojure's `core.async/mult`
mult :: Channel a -> IO (Mult a)
mult = Chan.dupChan . _in
-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
--
-- You can use this to read from a channel in a background process, e.g.:
--
-- >>> c <- Go.chan
-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print
tap :: Mult a -> IO a
tap = Chan.readChan
-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> IO a
read = Chan.readChan . _out
-- | Write to a channel. Blocks if the channel is full.
write :: Channel a -> a -> IO Bool
write = Chan.tryWriteChan . _in
-- $example
--
-- A simple example from ghci:
--
-- >>> import qualified Control.Concurrent.Go as Go
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "test"
-- >>> Go.read c
-- "test"
--
-- An example with tap and mult:
--
-- >>> c <- Go.chan :: IO (Go.Channel Text)
-- >>> Go.write c "hi"
-- >>> Go.read c
-- "hi"
-- >>> Go.fork
-- >>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t)
-- ThreadId 810
-- >>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t)
-- ThreadId 825
-- >>> Go.write c "test"
-- "two: t"eosnte":
-- test"
--
-- The text is garbled because the actions are happening concurrently and
-- trying to serialize to write the output, but you get the idea.
|