-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbroadcast.go
210 lines (184 loc) · 5.97 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package whatnot
import (
"sync"
"time"
"github.com/databeast/whatnot/access"
)
/*
Whatnot depends on multiple subscribers to a notification channel
The following is an implementation of GO channel multiplexing
*/
const (
defaultMultiplexerBuffer = 100
)
// EventMultiplexer is a pub-sub mechanism where consumers can Register to
// receive messages sent to Broadcast.
type EventMultiplexer struct {
logsupport
// Broadcast is the channel to set events to for them to be multiplexed out
Broadcast chan<- WatchEvent
lock *sync.Mutex
onElement *PathElement
connections map[chan<- WatchEvent]bool
}
// Register starts receiving messages on the given channel. If a
// channel close is seen, either the topic has been shut down, or the
// consumer was too slow, and should re-register.
func (t *EventMultiplexer) Register(ch chan<- WatchEvent, recursive bool) {
t.lock.Lock()
t.connections[ch] = recursive
t.lock.Unlock()
}
// Unregister stops receiving messages on this channel.
func (t *EventMultiplexer) Unregister(ch chan<- WatchEvent) {
t.lock.Lock()
// double-close is not safe, so make sure we didn't already
// drop this consumer as too slow
_, ok := t.connections[ch]
if ok {
delete(t.connections, ch)
close(ch)
}
t.lock.Unlock()
}
// initEventBroadcast creates a new event multiplexer
// Messages can be broadcast on this topic,
// and registered consumers are guaranteed to either receive them, or
// see a channel close.
func (p *PathElement) initEventBroadcast() {
if p.subscriberNotify != nil {
// simple reentrance
return
}
p.subscriberNotify = NewEventsMultiplexer()
p.subscriberNotify.onElement = p
// start reading incoming events from child elements
p.watchChildren()
}
// NewEventsMultiplexer creates a new event multiplexer
// that will duplicate incoming WatchEvents to multiple watcher
// channels
func NewEventsMultiplexer() *EventMultiplexer {
var broadcast = make(chan WatchEvent, defaultMultiplexerBuffer)
t := &EventMultiplexer{
Broadcast: broadcast,
lock: &sync.Mutex{},
onElement: nil,
connections: make(map[chan<- WatchEvent]bool, 5), //give this a bit of a buffer so slow subscribers can respond
}
go t.run(broadcast)
return t
}
// BroadcastAsync has the multiplexer submit the WatchEvent
// instead of the caller attaching directly to a channel
// delivery is not guaranteed in this case and the goroutine
// will eventually exit if it deadlocks
func (t *EventMultiplexer) BroadcastAsync(evt WatchEvent) {
go func() {
t.Broadcast <- evt
}()
// TODO: kill goroutine after max wait time
}
// run is the primary goroutine loop for each Multiplexer
// to shut it down, send a channel close to the multiplexer's Broadcast channel
func (t *EventMultiplexer) run(broadcastchan <-chan WatchEvent) {
for msg := range broadcastchan {
func() {
// send our broadcast event to every subscriber
for ch, rec := range t.connections {
if rec == false {
if msg.elem != t.onElement {
continue // this connect only wants notifications about itself
}
}
select {
case ch <- msg:
// sends event to individual multiplexer subscriber
t.Debugf("transmitted event %d to broadcast subscriber", msg.id)
default:
// cannot send message, listener has closed the channel
t.lock.Lock()
t.Debugf("%s removing disconnected subscriber", t.onElement.AbsolutePath().ToPathString())
delete(t.connections, ch)
close(ch)
t.lock.Unlock()
}
}
}()
}
// broadcast channel has been closed at this point
t.lock.Lock()
for ch := range t.connections {
delete(t.connections, ch)
close(ch)
}
t.lock.Unlock()
}
// watchChildren is a PathElement-specific goroutine to handle event channels
// in default mode, this means that goroutine load scales 1-to-1 with total
// number of distinct pathelements
func (p *PathElement) watchChildren() {
go func() {
var e elementChange
for {
if p.prunectx != nil { // need to watch for pruning cancellation
select {
case e = <-p.subevents:
// process events from our children
p.Debugf("%s received change notify %d from child: %s", p.AbsolutePath().ToPathString(), e.id, e.elem.AbsolutePath().ToPathString())
case e = <-p.selfnotify:
// process events from ourself
p.Debugf("%s received change notify on self", p.AbsolutePath().ToPathString())
case <-p.prunectx.Done():
// element is being pruned, we need to shut down this monitoring goroutine
// but first send this final notification upwards
p.Debugf("pruning signal received - shutting down event watch goroutine")
p.parentnotify <- elementChange{
id: randid.Uint64(),
elem: p,
change: ChangePruned,
actor: access.Role{},
}
// then do what we need to do with the event ourselves now
p.logChange(e)
// Broadcast the change out to all subscribers
p.subscriberNotify.Broadcast <- WatchEvent{
id: randid.Uint64(),
elem: p,
TS: time.Now(),
Note: "",
Change: ChangePruned,
Actor: e.actor,
}
return
}
} else { // just watch for path element change events
select {
case e = <-p.subevents:
// process events from our children
p.Debugf("%s received change notify %d from child: %s", p.AbsolutePath().ToPathString(), e.id, e.elem.AbsolutePath().ToPathString())
case e = <-p.selfnotify:
// process events from ourself
p.Debugf("%s received change notify on self", p.AbsolutePath().ToPathString())
}
}
if e.elem == nil {
panic("elementChange event passed with nil PathElement")
}
pe := e // clone our event to send upwards, make the data race analyzer happy
p.parentnotify <- pe
// then do what we need to do with the event ourselves now
p.logChange(e)
// Broadcast the change out to all subscribers
p.subscriberNotify.Broadcast <- WatchEvent{
id: e.id,
elem: e.elem,
TS: time.Now(),
Note: "",
Change: e.change,
Actor: e.actor,
}
// TODO: needs close handler
}
}()
}