Skip to content

Commit e17e081

Browse files
committed
mpsc: do not do atomic decrement in unbounded receiver
Instead have a local non-atomic `num_received` variable and considers queue exhausted when the number of sent messages is equal to the number of received messages. Before: ``` test unbounded_uncontended ... bench: 100,656 ns/iter (+/- 4,860) ``` After: ``` test unbounded_uncontended ... bench: 93,251 ns/iter (+/- 3,217) ``` It's hard (or impossible, I tried to and failed) to do the same optimization for the bounded queue, because bounded sender needs to atomically track message queue size. Thus previous commit completely splits bounded and unbounded queue implementations.
1 parent c55d59a commit e17e081

File tree

1 file changed

+33
-32
lines changed
  • futures-channel/src/mpsc

1 file changed

+33
-32
lines changed

futures-channel/src/mpsc/mod.rs

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ pub struct Receiver<T> {
145145
#[derive(Debug)]
146146
pub struct UnboundedReceiver<T> {
147147
inner: Arc<UnboundedInner<T>>,
148+
num_received: usize,
148149
}
149150

150151
// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
@@ -300,7 +301,8 @@ struct State {
300301
// `true` when the channel is open
301302
is_open: bool,
302303

303-
// Number of messages in the channel
304+
// Number of messages in the channel for bounded queue
305+
// Number of sent messages modulo MAX_CAPACITY + 1 for unbounded queue
304306
num_messages: usize,
305307
}
306308

@@ -424,6 +426,7 @@ pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
424426

425427
let rx = UnboundedReceiver {
426428
inner,
429+
num_received: 0,
427430
};
428431

429432
(tx, rx)
@@ -625,17 +628,14 @@ impl<T> UnboundedSender<T> {
625628

626629
// Do the send without parking current task.
627630
fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
628-
match self.inner.inc_num_messages() {
629-
Some(_num_messages) => {}
630-
None => {
631-
return Err(TrySendError {
632-
err: SendError {
633-
kind: SendErrorKind::Disconnected,
634-
},
635-
val: msg,
636-
});
637-
},
638-
};
631+
if !self.inner.inc_num_messages() {
632+
return Err(TrySendError {
633+
err: SendError {
634+
kind: SendErrorKind::Disconnected,
635+
},
636+
val: msg,
637+
});
638+
}
639639

640640
self.inner.queue_push_and_signal(msg);
641641

@@ -844,13 +844,12 @@ impl<T> UnboundedReceiver<T> {
844844
match unsafe { self.inner.message_queue.pop_spin() } {
845845
Some(msg) => {
846846
// Decrement number of messages
847-
self.inner.dec_num_messages();
847+
self.num_received = self.num_received.wrapping_add(1);
848848

849849
Poll::Ready(Some(msg))
850850
}
851851
None => {
852-
let state = decode_state(self.inner.state.load(SeqCst));
853-
if state.is_open || state.num_messages != 0 {
852+
if !self.is_end_of_queue_from_state() {
854853
// If queue is open, we need to return Pending
855854
// to be woken up when new messages arrive.
856855
// If queue is closed but num_messages is non-zero,
@@ -868,6 +867,20 @@ impl<T> UnboundedReceiver<T> {
868867
}
869868
}
870869

870+
fn is_end_of_queue_from_state(&self) -> bool {
871+
let state = decode_state(self.inner.state.load(SeqCst));
872+
if state.is_open {
873+
false
874+
} else {
875+
// The condition is true when either:
876+
// * queue is empty
877+
// * N * (MAX_CAPACITY + 1) messages are in the queue
878+
// or being added to the queue at this moment
879+
// The latter is very improbable because receiver has checked the queue recently.
880+
state.num_messages == (self.num_received & !OPEN_MASK)
881+
}
882+
}
883+
871884
/// Tries to receive the next message without notifying a context if empty.
872885
///
873886
/// It is not recommended to call this function from inside of a future,
@@ -996,29 +1009,24 @@ impl<T> UnboundedInner<T> {
9961009
}
9971010

9981011
// Increment the number of queued messages. Returns new `num_messages`.
999-
fn inc_num_messages(&self) -> Option<usize> {
1012+
fn inc_num_messages(&self) -> bool {
10001013
let mut curr = self.state.load(SeqCst);
10011014

10021015
loop {
10031016
let mut state = decode_state(curr);
10041017

10051018
// The receiver end closed the channel.
10061019
if !state.is_open {
1007-
return None;
1020+
return false;
10081021
}
10091022

1010-
// This probably is never hit? Odds are the process will run out of
1011-
// memory first. It may be worth to return something else in this
1012-
// case?
1013-
assert!(state.num_messages < MAX_CAPACITY, "buffer space \
1014-
exhausted; sending this messages would overflow the state");
1015-
1016-
state.num_messages += 1;
1023+
// Won't overflow
1024+
state.num_messages = (state.num_messages + 1) & !OPEN_MASK;
10171025

10181026
let next = encode_state(&state);
10191027
match self.state.compare_exchange(curr, next, SeqCst, SeqCst) {
10201028
Ok(_) => {
1021-
return Some(state.num_messages)
1029+
return true;
10221030
}
10231031
Err(actual) => curr = actual,
10241032
}
@@ -1034,13 +1042,6 @@ impl<T> UnboundedInner<T> {
10341042

10351043
self.state.fetch_and(!OPEN_MASK, SeqCst);
10361044
}
1037-
1038-
fn dec_num_messages(&self) {
1039-
// OPEN_MASK is highest bit, so it's unaffected by subtraction
1040-
// unless there's underflow, and we know there's no underflow
1041-
// because number of messages at this point is always > 0.
1042-
self.state.fetch_sub(1, SeqCst);
1043-
}
10441045
}
10451046

10461047
unsafe impl<T: Send> Send for BoundedInner<T> {}

0 commit comments

Comments
 (0)