Skip to content

Commit ff45df6

Browse files
committed
nsqd: improve gossip test time/robustness
1 parent 103f4fc commit ff45df6

File tree

3 files changed

+41
-17
lines changed

3 files changed

+41
-17
lines changed

nsqd/gossip.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,16 @@ func initSerf(opts *Options,
9595
serfConfig.MemberlistConfig.BindPort = gossipAddr.Port
9696
serfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
9797
serfConfig.MemberlistConfig.GossipNodes = 5
98+
serfConfig.MemberlistConfig.ProbeInterval = opts.GossipProbeInterval
99+
serfConfig.MemberlistConfig.SuspicionMult = opts.GossipSuspicionMult
98100
serfConfig.MemberlistConfig.LogOutput = logWriter{opts.Logger, []byte("memberlist:")}
99101
if len(key) != 0 {
100102
serfConfig.MemberlistConfig.SecretKey = key
101103
}
102104
serfConfig.EventCh = serfEventChan
103105
serfConfig.EventBuffer = 1024
104-
serfConfig.ReconnectTimeout = time.Hour
106+
serfConfig.ReapInterval = opts.GossipReapInterval
107+
serfConfig.ReconnectTimeout = opts.GossipReconnectTimeout
105108
serfConfig.LogOutput = logWriter{opts.Logger, []byte("serf:")}
106109

107110
return serf.Create(serfConfig)

nsqd/gossip_test.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ func TestGossip(t *testing.T) {
5757
sort.Ints(tcpPorts)
5858

5959
// wait for convergence
60-
converge(5*time.Second, nsqds, convergenceTester.c, func() bool {
60+
converged := converge(5*time.Second, nsqds, convergenceTester.c, func() bool {
6161
for _, nsqd := range nsqds {
6262
if len(nsqd.rdb.FindProducers("client", "", "")) != num {
6363
return false
6464
}
6565
}
6666
return true
6767
})
68+
equal(t, converged, true)
6869

6970
// all nodes in the cluster should have registrations
7071
for _, nsqd := range nsqds {
@@ -85,7 +86,7 @@ func TestGossip(t *testing.T) {
8586
topic.GetChannel("ch")
8687
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port
8788

88-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
89+
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
8990
for _, nsqd := range nsqds {
9091
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
9192
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
@@ -94,6 +95,7 @@ func TestGossip(t *testing.T) {
9495
}
9596
return true
9697
})
98+
equal(t, converged, true)
9799

98100
for _, nsqd := range nsqds {
99101
producers := nsqd.rdb.FindProducers("topic", topicName, "")
@@ -127,6 +129,10 @@ func TestGossipResync(t *testing.T) {
127129
opts.Logger = newTestLogger(t)
128130
opts.GossipAddress = addr.String()
129131
opts.BroadcastAddress = "127.0.0.1"
132+
opts.GossipReapInterval = 200 * time.Millisecond
133+
opts.GossipReconnectTimeout = 100 * time.Millisecond
134+
opts.GossipSuspicionMult = 1
135+
opts.GossipProbeInterval = 100 * time.Millisecond
130136
opts.gossipDelegate = convergenceTester
131137
if seedNode != nil {
132138
opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress}
@@ -150,7 +156,7 @@ func TestGossipResync(t *testing.T) {
150156
topic.GetChannel("ch")
151157
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port
152158

153-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
159+
converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
154160
for _, nsqd := range nsqds {
155161
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
156162
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
@@ -159,6 +165,7 @@ func TestGossipResync(t *testing.T) {
159165
}
160166
return true
161167
})
168+
equal(t, converged, true)
162169

163170
for _, nsqd := range nsqds {
164171
producers := nsqd.rdb.FindProducers("topic", topicName, "")
@@ -175,32 +182,34 @@ func TestGossipResync(t *testing.T) {
175182
stillAlive := nsqds[:num-1]
176183

177184
// check that other nodes see it as closed
178-
converge(10*time.Second, stillAlive, convergenceTester.c, func() bool {
185+
converged = converge(10*time.Second, stillAlive, convergenceTester.c, func() bool {
179186
for _, nsqd := range stillAlive {
180187
if len(nsqd.serf.Members()) != len(stillAlive) {
181188
return false
182189
}
183190
}
184191
return true
185192
})
193+
equal(t, converged, true)
186194

187195
// restart stopped node
188196
_, _, nsqd := mustStartNSQD(nsqds[num-1].getOpts())
189197
defer nsqd.Exit()
190198
nsqds[num-1] = nsqd
191199

192200
// check that other nodes see it as back open
193-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
201+
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
194202
for _, nsqd := range nsqds {
195203
if len(nsqd.serf.Members()) != len(nsqds) {
196204
return false
197205
}
198206
}
199207
return true
200208
})
209+
equal(t, converged, true)
201210

202211
// check that all nodes see the restarted first node
203-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
212+
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
204213
for _, nsqd := range nsqds {
205214
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
206215
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
@@ -209,6 +218,7 @@ func TestGossipResync(t *testing.T) {
209218
}
210219
return true
211220
})
221+
equal(t, converged, true)
212222

213223
// we should have producers for the topic/channel back now
214224
for _, nsqd := range nsqds {
@@ -266,7 +276,7 @@ func TestRegossip(t *testing.T) {
266276
topic.GetChannel("ch")
267277
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port
268278

269-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
279+
converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
270280
for _, nsqd := range nsqds {
271281
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
272282
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
@@ -275,6 +285,7 @@ func TestRegossip(t *testing.T) {
275285
}
276286
return true
277287
})
288+
equal(t, converged, true)
278289

279290
for _, nsqd := range nsqds {
280291
producers := nsqd.rdb.FindProducers("topic", topicName, "")
@@ -294,7 +305,7 @@ func TestRegossip(t *testing.T) {
294305
}
295306

296307
// wait for regossip
297-
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
308+
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
298309
for _, nsqd := range nsqds {
299310
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
300311
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
@@ -303,6 +314,7 @@ func TestRegossip(t *testing.T) {
303314
}
304315
return true
305316
})
317+
equal(t, converged, true)
306318

307319
// we should have producers for the topic/channel back now on all nodes
308320
for _, nsqd := range nsqds {
@@ -316,16 +328,17 @@ func TestRegossip(t *testing.T) {
316328
}
317329
}
318330

319-
func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) {
320-
// wait for convergence
321-
converged := false
322-
t := time.NewTimer(timeout)
323-
for !converged {
331+
func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) bool {
332+
for {
324333
select {
325-
case <-t.C:
326-
converged = true
334+
case <-time.After(timeout):
335+
return false
327336
case <-notifyChan:
328-
converged = isConverged()
337+
if isConverged() {
338+
goto exit
339+
}
329340
}
330341
}
342+
exit:
343+
return true
331344
}

nsqd/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ type Options struct {
2424
GossipAddress string `flag:"gossip-address"`
2525
GossipSeedAddresses []string `flag:"gossip-seed-address"`
2626
GossipRegossipInterval time.Duration `flag:"gossip-regossip-interval"`
27+
GossipProbeInterval time.Duration
28+
GossipSuspicionMult int
29+
GossipReapInterval time.Duration
30+
GossipReconnectTimeout time.Duration
2731

2832
// diskqueue options
2933
DataPath string `flag:"data-path"`
@@ -148,5 +152,9 @@ func NewOptions() *Options {
148152

149153
gossipDelegate: nilGossipDelegate{},
150154
GossipRegossipInterval: 60 * time.Second,
155+
GossipProbeInterval: 1 * time.Second,
156+
GossipSuspicionMult: 5,
157+
GossipReapInterval: 15 * time.Second,
158+
GossipReconnectTimeout: 1 * time.Hour,
151159
}
152160
}

0 commit comments

Comments
 (0)