-
Notifications
You must be signed in to change notification settings - Fork 655
mpsc: split bounded and unbounded implementations #1326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// Max buffer size of the channel. If `None` then the channel is unbounded. | ||
buffer: Option<usize>, | ||
|
||
struct UnboundedInner<T> { | ||
// Internal channel state. Consists of the number of messages stored in the | ||
// channel as well as a flag signalling that the channel is closed. | ||
state: AtomicUsize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be just an AtomicBool
is_open now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it can't without changing the contract provided by the queue. Consider this scenario.
- sender checks
is_open
flag, it is open - receiver calls
set_closed
- receiver polls the queue, finds no messages, assumes the end of a stream (when there's no message counter)
- sender enqueues a message and returns the
Ok
result that the message is successfully sent - so sender assumes the message is sent, and receiver already received the end of a stream, the message is lost
message_count
field is needed to guarantee no message is lost during the queue closure. But it creates overhead.
5c28825
to
5cf1e8f
Compare
Rebased and fixed clippy |
5cf1e8f
to
f2deb46
Compare
futures-channel/src/mpsc/mod.rs
Outdated
unbounded: UnboundedInner<T>, | ||
|
||
// Atomic, FIFO queue used to send parked task handles to the receiver. | ||
parked_queue: Queue<Arc<Mutex<SenderTask>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some further thoughts: This could potentially use either a similar approach or directly the ManualResetEvent
from my CR to wake-up interested Sender
s. Those would poll the future for waiting on new capacity and then do do a try_send()
. The advantage would be that the dynamically allocated Queue
and the SenderTask
allocations would go away.
However after thinking about it again it doesn't really fit the current API contract of the channel, since e.g. poll_ready
doesn't return a future that can be used to directly wait until capacity is available, but is an instance method on the Sender
, which isn't suitable for storing the intrusive waiter.
Nevertheless might be an interesting thing to try and benchmark at some point of time.
f2deb46
to
fb04272
Compare
Copy-paste implementations to avoid certain optimizations of unbounded channels.
fb04272
to
f22b339
Compare
Rebase on master. This version is just:
This won't speed anything significantly but unlocks unbounded-queue-specific optimizations. |
Seems like a reasonable start to me, thanks! |
The major downside of code sharing is performance overhead (I've
noticed it in profiler) in an unbounded queue:
clone
/drop
ofUnboundedSender
perform unnecessary atomic increment/decrementof
num_senders
fieldThe downside of this patch is the increase of LOC because of a
copy-paste of
next_message
function.This split unlocks certain unbounded-queue-specific optimizations, for example, tracking the number of messages in the unbounded queue might be not needed (and it is expensive because of atomic increment/decrement per message in current implementation (#1337)).