Skip to content

Commit b3b29b7

Browse files
bitpengploxiln
authored andcommitted
nsqd: set memoryMsgChan as nil when -mem-queue-size=0
do not use unbuffered chan for in-memory queue when size=0 because user intended all messages to go through backend queue (disk)
1 parent 6774510 commit b3b29b7

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

nsqd/channel.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,15 @@ func NewChannel(topicName string, channelName string, ctx *context,
7777
c := &Channel{
7878
topicName: topicName,
7979
name: channelName,
80-
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
80+
memoryMsgChan: nil,
8181
clients: make(map[int64]Consumer),
8282
deleteCallback: deleteCallback,
8383
ctx: ctx,
8484
}
85+
// create mem-queue only if size > 0 (do not use unbuffered chan)
86+
if ctx.nsqd.getOpts().MemQueueSize > 0 {
87+
c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
88+
}
8589
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
8690
c.e2eProcessingLatencyStream = quantile.New(
8791
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,

nsqd/topic.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
4747
t := &Topic{
4848
name: topicName,
4949
channelMap: make(map[string]*Channel),
50-
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
50+
memoryMsgChan: nil,
5151
startChan: make(chan int, 1),
5252
exitChan: make(chan int),
5353
channelUpdateChan: make(chan int),
@@ -57,7 +57,10 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
5757
deleteCallback: deleteCallback,
5858
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
5959
}
60-
60+
// create mem-queue only if size > 0 (do not use unbuffered chan)
61+
if ctx.nsqd.getOpts().MemQueueSize > 0 {
62+
t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
63+
}
6164
if strings.HasSuffix(topicName, "#ephemeral") {
6265
t.ephemeral = true
6366
t.backend = newDummyBackendQueue()

0 commit comments

Comments
 (0)