Skip to content

Commit 67c992b

Browse files
committed
Fix the client connection is closing issue in robustness test
Signed-off-by: Chun-Hung Tseng <[email protected]>
1 parent 985e056 commit 67c992b

File tree

2 files changed

+107
-5
lines changed

2 files changed

+107
-5
lines changed

tests/robustness/client/client.go

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
"go.etcd.io/etcd/tests/v3/robustness/report"
3131
)
3232

33+
var (
34+
errClientIsClosed = errors.New("client is closed")
35+
)
36+
3337
// RecordingClient provides a semi-etcd client (different interface than
3438
// clientv3.Client) that records all the requests and responses made. Doesn't
3539
// allow for concurrent requests to conform to model.AppendableHistory requirements.
@@ -45,6 +49,8 @@ type RecordingClient struct {
4549
// mux ensures order of request appending.
4650
kvMux sync.Mutex
4751
kvOperations *model.AppendableHistory
52+
53+
isClosed bool
4854
}
4955

5056
var _ clientv3.KV = (*RecordingClient)(nil)
@@ -69,11 +75,20 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6975
client: *cc,
7076
kvOperations: model.NewAppendableHistory(ids),
7177
baseTime: baseTime,
78+
isClosed: false,
7279
}, nil
7380
}
7481

7582
func (c *RecordingClient) Close() error {
76-
return c.client.Close()
83+
c.kvMux.Lock()
84+
defer c.kvMux.Unlock()
85+
if c.isClosed {
86+
return nil
87+
}
88+
89+
err := c.client.Close()
90+
c.isClosed = true // if we set to true only if there is no error, we need to handle the retry in defer
91+
return err
7792
}
7893

7994
func (c *RecordingClient) Report() report.ClientReport {
@@ -94,6 +109,12 @@ func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.
94109
}
95110

96111
func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) {
112+
c.kvMux.Lock()
113+
defer c.kvMux.Unlock()
114+
if c.isClosed {
115+
return nil, errClientIsClosed
116+
}
117+
97118
ops := []clientv3.OpOption{}
98119
if end != "" {
99120
ops = append(ops, clientv3.WithRange(end))
@@ -104,8 +125,6 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
104125
if limit != 0 {
105126
ops = append(ops, clientv3.WithLimit(limit))
106127
}
107-
c.kvMux.Lock()
108-
defer c.kvMux.Unlock()
109128
callTime := time.Since(c.baseTime)
110129
resp, err := c.client.Get(ctx, start, ops...)
111130
returnTime := time.Since(c.baseTime)
@@ -116,6 +135,10 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision
116135
func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) {
117136
c.kvMux.Lock()
118137
defer c.kvMux.Unlock()
138+
if c.isClosed {
139+
return nil, errClientIsClosed
140+
}
141+
119142
callTime := time.Since(c.baseTime)
120143
resp, err := c.client.Put(ctx, key, value)
121144
returnTime := time.Since(c.baseTime)
@@ -126,6 +149,10 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clien
126149
func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
127150
c.kvMux.Lock()
128151
defer c.kvMux.Unlock()
152+
if c.isClosed {
153+
return nil, errClientIsClosed
154+
}
155+
129156
callTime := time.Since(c.baseTime)
130157
resp, err := c.client.Delete(ctx, key)
131158
returnTime := time.Since(c.baseTime)
@@ -172,12 +199,22 @@ func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) {
172199
}
173200

174201
func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn {
202+
c.kvMux.Lock()
203+
defer c.kvMux.Unlock()
204+
if c.isClosed {
205+
return nil
206+
}
207+
175208
return &wrappedTxn{txn: c.client.Txn(ctx), c: c}
176209
}
177210

178211
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) {
179212
c.kvMux.Lock()
180213
defer c.kvMux.Unlock()
214+
if c.isClosed {
215+
return nil, errClientIsClosed
216+
}
217+
181218
callTime := time.Since(c.baseTime)
182219
resp, err := c.client.Lease.Grant(ctx, ttl)
183220
returnTime := time.Since(c.baseTime)
@@ -188,6 +225,10 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
188225
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) {
189226
c.kvMux.Lock()
190227
defer c.kvMux.Unlock()
228+
if c.isClosed {
229+
return nil, errClientIsClosed
230+
}
231+
191232
callTime := time.Since(c.baseTime)
192233
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseID))
193234
returnTime := time.Since(c.baseTime)
@@ -196,9 +237,13 @@ func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clie
196237
}
197238

198239
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) {
199-
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
200240
c.kvMux.Lock()
201241
defer c.kvMux.Unlock()
242+
if c.isClosed {
243+
return nil, errClientIsClosed
244+
}
245+
246+
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
202247
callTime := time.Since(c.baseTime)
203248
resp, err := c.client.Put(ctx, key, value, opts)
204249
returnTime := time.Since(c.baseTime)
@@ -209,6 +254,10 @@ func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value st
209254
func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentResponse, error) {
210255
c.kvMux.Lock()
211256
defer c.kvMux.Unlock()
257+
if c.isClosed {
258+
return nil, errClientIsClosed
259+
}
260+
212261
callTime := time.Since(c.baseTime)
213262
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
214263
returnTime := time.Since(c.baseTime)
@@ -219,6 +268,10 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
219268
func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
220269
c.kvMux.Lock()
221270
defer c.kvMux.Unlock()
271+
if c.isClosed {
272+
return nil, errClientIsClosed
273+
}
274+
222275
callTime := time.Since(c.baseTime)
223276
resp, err := c.client.Compact(ctx, rev)
224277
returnTime := time.Since(c.baseTime)
@@ -229,48 +282,76 @@ func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.
229282
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
230283
c.kvMux.Lock()
231284
defer c.kvMux.Unlock()
285+
if c.isClosed {
286+
return nil, errClientIsClosed
287+
}
288+
232289
resp, err := c.client.MemberList(ctx, opts...)
233290
return resp, err
234291
}
235292

236293
func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
237294
c.kvMux.Lock()
238295
defer c.kvMux.Unlock()
296+
if c.isClosed {
297+
return nil, errClientIsClosed
298+
}
299+
239300
resp, err := c.client.MemberAdd(ctx, peerAddrs)
240301
return resp, err
241302
}
242303

243304
func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
244305
c.kvMux.Lock()
245306
defer c.kvMux.Unlock()
307+
if c.isClosed {
308+
return nil, errClientIsClosed
309+
}
310+
246311
resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
247312
return resp, err
248313
}
249314

250315
func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
251316
c.kvMux.Lock()
252317
defer c.kvMux.Unlock()
318+
if c.isClosed {
319+
return nil, errClientIsClosed
320+
}
321+
253322
resp, err := c.client.MemberRemove(ctx, id)
254323
return resp, err
255324
}
256325

257326
func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
258327
c.kvMux.Lock()
259328
defer c.kvMux.Unlock()
329+
if c.isClosed {
330+
return nil, errClientIsClosed
331+
}
332+
260333
resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
261334
return resp, err
262335
}
263336

264337
func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
265338
c.kvMux.Lock()
266339
defer c.kvMux.Unlock()
340+
if c.isClosed {
341+
return nil, errClientIsClosed
342+
}
343+
267344
resp, err := c.client.MemberPromote(ctx, id)
268345
return resp, err
269346
}
270347

271348
func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
272349
c.kvMux.Lock()
273350
defer c.kvMux.Unlock()
351+
if c.isClosed {
352+
return nil, errClientIsClosed
353+
}
354+
274355
resp, err := c.client.Status(ctx, endpoint)
275356
return resp, err
276357
}
@@ -280,6 +361,12 @@ func (c *RecordingClient) Endpoints() []string {
280361
}
281362

282363
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
364+
c.kvMux.Lock()
365+
defer c.kvMux.Unlock()
366+
if c.isClosed {
367+
return nil
368+
}
369+
283370
request := model.WatchRequest{
284371
Key: key,
285372
Revision: rev,
@@ -333,6 +420,12 @@ func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest)
333420
}
334421

335422
func (c *RecordingClient) RequestProgress(ctx context.Context) error {
423+
c.kvMux.Lock()
424+
defer c.kvMux.Unlock()
425+
if c.isClosed {
426+
return errClientIsClosed
427+
}
428+
336429
return c.client.RequestProgress(ctx)
337430
}
338431

@@ -434,7 +527,7 @@ func (cs *ClientSet) close() {
434527
return
435528
}
436529
for _, c := range cs.clients {
437-
c.Close()
530+
_ = c.Close()
438531
}
439532
cs.closed = true
440533
}

tests/robustness/client/watch.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ resetWatch:
103103
return nil
104104
}
105105
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
106+
if watch == nil {
107+
return nil
108+
}
106109
for {
107110
select {
108111
case revision, ok := <-maxRevisionChan:
@@ -161,13 +164,19 @@ func openWatchPeriodically(ctx context.Context, g *errgroup.Group, c *RecordingC
161164
g.Go(func() error {
162165
resp, err := c.Get(ctx, "/key")
163166
if err != nil {
167+
if errors.Is(err, errClientIsClosed) {
168+
return nil
169+
}
164170
return err
165171
}
166172
rev := resp.Header.Revision + backgroundWatchConfig.RevisionOffset
167173

168174
watchCtx, cancel := context.WithCancel(ctx)
169175
defer cancel()
170176
w := c.Watch(watchCtx, "", rev, true, true, true)
177+
if w == nil {
178+
return nil
179+
}
171180
for {
172181
select {
173182
case <-ctx.Done():

0 commit comments

Comments
 (0)