Skip to content

Consider adding spsc interface with explicit closure #2371

Open
@stepancheg

Description

@stepancheg

There's a mistake I did many times, and I suspect others did it too.

It can be illustrated with this example:

fn produce_messages(tx: Sender<my_app::Result<MyMessage>>) -> io::Result<()> {
  let file = open("data.txt")?;
  for line in BufWriter::new(file).lines() {
    match line {
      Ok(line) => tx.send(Ok(parse_data(line)),
      Err(e) => { tx.send(Err(my_app::Error)); return Err(e); }
    }
  }
  Ok(())
}

async fn render_webpage_from_data(rx: Receiver<my_app::Result<MyMessage>>, page_builder: PageBuilder) -> my_app::Result<()> {
  page_builder.render_header();
  loop {
    while let Some(message) = rx.next().await? {
     page_builder.render_message(message);
    }
  }
  page_builder.render_footer();
}

Can you spot an error here? There are at least two mistakes:

  • if file open failed, the channel will be quietly dropped, and the consumer will assume end of messages
  • of any code in the producer panics, that panic might be handled somewhere, but consumer will be notified of it by simple end of stream indistinguishable from successful completion

This problem can be worked around by explicitly inserting a special message like EndOfStream to the queue, but as I said, I forgot to do it several times, and other people could forget to do it.

So my proposal is:

  • Explicit closure call is necessary on the sender side
  • Patching existing mpsc interfaces might be hard
  • Explicit closure is more important for spsc queues

API would look like:

impl spsc::UnboundedSender<T> {
  // same operations as in mpsc::Sender, including `close_channel`
  // except these operations could accept `&mut self` which can be used for certain optimizations
}

enum TryRecvError {
  NoMessagesAvailableYet,
  SenderDisconnectedWithoutClose,
}

impl spsc::UnboundedReceiver<T> {
  fn try_next(&mut self) -> Result<Option<T>, TryRecvError> { ... }
}

enum NextError {
  SenderDisconnectedWithoutClose,
}

impl<T> Stream for UnboundedReceiver<T> {
  type Output = Result<T, NextError>;

  fn poll_next(...) { ... }
}

So producer would need to be modified like this:

fn produce_messages(tx: Sender<my_app::Result<MyMessage>>) -> io::Result<()> {
  ...
  tx.close_channel(); // notify we finished successfully
  Ok(())
}

Alternatively mpsc interface can be modified:

  • by changing semantics of most operations including Stream::Output type
  • by introducing additional/alternative recv and Stream operations which will require explicit closure

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions