@@ -16,9 +16,11 @@ package cache
16
16
17
17
import (
18
18
"context"
19
+ "errors"
19
20
"sync"
20
21
"time"
21
22
23
+ "go.etcd.io/etcd/api/v3/etcdserverpb"
22
24
clientv3 "go.etcd.io/etcd/client/v3"
23
25
)
24
26
@@ -108,6 +110,9 @@ func (d *demux) Unregister(w *watcher) {
108
110
func (d * demux ) Init (minRev int64 ) {
109
111
d .mu .Lock ()
110
112
defer d .mu .Unlock ()
113
+ if minRev == 0 {
114
+ return
115
+ }
111
116
if d .minRev == 0 {
112
117
// Watch started for empty demux
113
118
d .minRev = minRev
@@ -129,23 +134,35 @@ func (d *demux) Init(minRev int64) {
129
134
}
130
135
131
136
func (d * demux ) Broadcast (resp clientv3.WatchResponse ) error {
132
- events := resp .Events
133
- if len (events ) == 0 {
134
- return nil
135
- }
136
-
137
137
d .mu .Lock ()
138
138
defer d .mu .Unlock ()
139
- err := validateRevisions (events , d .maxRev )
139
+ if d .minRev == 0 {
140
+ return errors .New ("demux: not initialized" )
141
+ }
142
+ err := validateRevisions (resp , d .maxRev )
140
143
if err != nil {
141
144
return err
142
145
}
143
- d .updateStoreLocked (events )
144
- d .broadcastLocked (events )
146
+ d .updateStoreLocked (resp )
147
+ d .broadcastLocked (resp )
145
148
return nil
146
149
}
147
150
148
- func (d * demux ) updateStoreLocked (events []* clientv3.Event ) {
151
+ func (d * demux ) LatestRev () int64 {
152
+ d .mu .RLock ()
153
+ defer d .mu .RUnlock ()
154
+ return d .maxRev
155
+ }
156
+
157
+ func (d * demux ) updateStoreLocked (resp clientv3.WatchResponse ) {
158
+ if resp .IsProgressNotify () {
159
+ d .maxRev = resp .Header .Revision
160
+ return
161
+ }
162
+ if len (resp .Events ) == 0 {
163
+ return
164
+ }
165
+ events := resp .Events
149
166
batchStart := 0
150
167
for end := 1 ; end < len (events ); end ++ {
151
168
if events [end ].Kv .ModRevision != events [batchStart ].Kv .ModRevision {
@@ -167,7 +184,33 @@ func (d *demux) updateStoreLocked(events []*clientv3.Event) {
167
184
d .maxRev = events [len (events )- 1 ].Kv .ModRevision
168
185
}
169
186
170
- func (d * demux ) broadcastLocked (events []* clientv3.Event ) {
187
+ func (d * demux ) broadcastLocked (resp clientv3.WatchResponse ) {
188
+ switch {
189
+ case resp .IsProgressNotify ():
190
+ d .broadcastProgressLocked (resp .Header .Revision )
191
+ case len (resp .Events ) != 0 :
192
+ d .broadcastEventsLocked (resp .Events )
193
+ default :
194
+ }
195
+ }
196
+
197
+ func (d * demux ) broadcastProgressLocked (progressRev int64 ) {
198
+ for w , nextRev := range d .activeWatchers {
199
+ if nextRev >= progressRev {
200
+ continue
201
+ }
202
+ resp := clientv3.WatchResponse {
203
+ Header : etcdserverpb.ResponseHeader {
204
+ Revision : progressRev ,
205
+ },
206
+ }
207
+ if w .enqueueResponse (resp ) {
208
+ d .activeWatchers [w ] = progressRev + 1
209
+ }
210
+ }
211
+ }
212
+
213
+ func (d * demux ) broadcastEventsLocked (events []* clientv3.Event ) {
171
214
firstRev := events [0 ].Kv .ModRevision
172
215
lastRev := events [len (events )- 1 ].Kv .ModRevision
173
216
@@ -177,18 +220,22 @@ func (d *demux) broadcastLocked(events []*clientv3.Event) {
177
220
delete (d .activeWatchers , w )
178
221
continue
179
222
}
223
+
180
224
sendStart := len (events )
181
225
for i , ev := range events {
182
226
if ev .Kv .ModRevision >= nextRev {
183
227
sendStart = i
184
228
break
185
229
}
186
230
}
231
+
187
232
if sendStart == len (events ) {
188
233
continue
189
234
}
190
235
191
- if ! w .enqueueEvent (events [sendStart :]) { // overflow → lagging
236
+ if ! w .enqueueResponse (clientv3.WatchResponse {
237
+ Events : events [sendStart :],
238
+ }) { // overflow → lagging
192
239
d .laggingWatchers [w ] = nextRev
193
240
delete (d .activeWatchers , w )
194
241
} else {
@@ -241,17 +288,26 @@ func (d *demux) resyncLaggingWatchers() {
241
288
continue
242
289
}
243
290
// TODO: re-enable key‐predicate in Filter when non‐zero startRev or performance tuning is needed
244
- enqueueFailed := false
291
+ resyncSuccess := true
245
292
d .history .AscendGreaterOrEqual (nextRev , func (rev int64 , eventBatch []* clientv3.Event ) bool {
246
- if ! w .enqueueEvent (eventBatch ) { // buffer overflow: watcher still lagging
247
- enqueueFailed = true
293
+ resp := clientv3.WatchResponse {
294
+ Events : eventBatch ,
295
+ }
296
+ if ! w .enqueueResponse (resp ) { // buffer overflow: watcher still lagging
297
+ resyncSuccess = false
248
298
return false
249
299
}
250
300
nextRev = rev + 1
251
301
return true
252
302
})
253
-
254
- if ! enqueueFailed {
303
+ // Send progress to just resync.
304
+ if resyncSuccess {
305
+ resp := clientv3.WatchResponse {
306
+ Header : etcdserverpb.ResponseHeader {Revision : d .maxRev },
307
+ }
308
+ if d .maxRev > nextRev && w .enqueueResponse (resp ) {
309
+ nextRev = d .maxRev + 1
310
+ }
255
311
delete (d .laggingWatchers , w )
256
312
d .activeWatchers [w ] = nextRev
257
313
} else {
0 commit comments