Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
nsqd: nsqd,
ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
// avoid mem-queue if size == 0 for more consistent ordering
if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
Expand All @@ -96,8 +97,7 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeral = true
if c.ephemeral {
c.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
Expand Down
33 changes: 18 additions & 15 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: nil,
memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
Expand Down Expand Up @@ -222,18 +218,25 @@ func (t *Topic) PutMessages(msgs []*Message) error {
}

func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
// If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
// but try to use memory chan for deferred messages (they lose deferred timer
// in backend queue) or if topic is ephemeral (there is no backend queue).
if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
select {
case t.memoryMsgChan <- m:
return nil
default:
break // write to backend
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to wait around for a second in the ephemeral case - the intention is that it's bounded and lossy, why add back pressure when the queue is full?

My proposal would be to simplify this whole if/else block to:

if cap(t.memoryMsgChan) > 0 || m.deferred != 0 {
		select {
		case t.memoryMsgChan <- m:
			return nil
		default:
			break // write to backend
		}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can pare it all the way down and remove the wait. The 1 second is arbitrary ... the expectation/desire is more like a handful of milliseconds. If there's a burst of messages and a bunch of channels, they can all get through the topic to the channels in just a few milliseconds, instead of having deferred time lost or messages kinda sampled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(oh and ephemeral topics don't work at all without the memory chan)

err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
return nil
}

Expand Down