1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
{- | An EDSL to make working with concurrent in-process code a bit easier
to read.
This module is expected to be imported qualified as `Go`. Inspired by
Golang and Clojure's core.async.
$example
-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
module Control.Concurrent.Go
(
-- * Running and forking
fork
-- * Channels
, Channel
, chan
, read
, write
, mult
, tap
)
where
import Alpha
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Chan.Unagi.Bounded
as Chan
import qualified Data.Aeson as Aeson
import Data.Text ( Text )
import qualified System.IO.Unsafe as Unsafe
-- | A standard channel.
data Channel a = Channel
{ _in :: Chan.InChan a
, _out :: Chan.OutChan a
, _size :: Int
}
instance Aeson.ToJSON (Channel a) where
toJSON c = Aeson.String ("#<channel " <> len c <> ">" :: Text)
where len = show . Unsafe.unsafePerformIO . Chan.estimatedLength . _in
-- | Starts a background process.
fork :: IO () -> IO Concurrent.ThreadId
fork = Concurrent.forkIO
-- | Make a new channel.
chan :: Int -> IO (Channel a)
chan n = do
(i, o) <- Chan.newChan n
return $ Channel i o n
-- | A channel for broadcasting to multiple consumers. See 'mult'.
type Mult a = Chan.OutChan a
-- | Duplicates a channel, but then anything written to the source will
-- be available to both. This is like Clojure's `core.async/mult`
mult :: Channel a -> IO (Mult a)
mult = Chan.dupChan . _in
-- | Read a value from a 'Mult'. This is like Clojure's `core.async/tap`.
--
-- You can use this to read from a channel in a background process, e.g.:
--
-- >>> c <- Go.chan
-- >>> Go.fork . forever <| Go.mult c >>= Go.tap >>= print
tap :: Mult a -> IO a
tap = Chan.readChan
-- | Take from a channel. Blocks until a value is received.
read :: Channel a -> IO a
read = Chan.readChan . _out
-- | Write to a channel. Blocks if the channel is full.
write :: Channel a -> a -> IO Bool
write = Chan.tryWriteChan . _in
{- $example
A simple example from ghci:
>>> import qualified Control.Concurrent.Go as Go
>>> c <- Go.chan :: IO (Go.Channel Text)
>>> Go.write c "test"
>>> Go.read c
"test"
An example with tap and mult:
>>> c <- Go.chan :: IO (Go.Channel Text)
>>> Go.write c "hi"
>>> Go.read c
"hi"
>>> Go.fork
>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("one: " <> t)
ThreadId 810
>>> Go.fork $ forever $ Go.mult c >>= Go.tap >>= \t -> print ("two: " <> t)
ThreadId 825
>>> Go.write c "test"
"two: t"eosnte":
test"
The text is garbled because the actions are happening concurrently and
trying to serialize to write the output, but you get the idea.
-}
|