Skip to content

An adapter for futures, which chunks up elements and flushes them after a timeout — or when the buffer is full. (Formerly known as tokio-batch.)

License

Notifications You must be signed in to change notification settings

mre/futures-batch

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

73 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

futures-batch

Build status Cargo Documentation

A stream adaptor that chunks up items with timeout support. Items are flushed when:

  • The buffer reaches capacity or
  • A timeout occurs

Based on the Chunks adaptor from futures-util, with added timeout functionality.

Note: Originally called tokio-batch, but renamed since it has no dependency on Tokio.

Usage

Add to your Cargo.toml:

[dependencies]
futures-batch = "0.7"

Use as a stream combinator:

use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::ChunksTimeoutStreamExt;

#[tokio::main]
async fn main() {
    let results = stream::iter(0..10)
        .chunks_timeout(5, Duration::from_secs(10))
        .collect::<Vec<_>>()
        .await;

    assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results);
}

This creates chunks of up to 5 items with a 10-second timeout.

TryChunksTimeout

For streams that yield Result values, use try_chunks_timeout to batch successful values and immediately propagate errors:

use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::TryChunksTimeoutStreamExt;

#[tokio::main]
async fn main() {
    let results = stream::iter((0..10).map(|i| if i == 5 { Err("error") } else { Ok(i) }))
        .try_chunks_timeout(3, Duration::from_secs(10))
        .collect::<Vec<_>>()
        .await;

    // Results in: [Ok([0, 1, 2]), Ok([3, 4]), Err("error"), Ok([6, 7, 8]), Ok([9])]
    println!("{:?}", results);
}

This batches Ok values until the buffer is full or timeout occurs, while immediately propagating any Err values.

Features

sink (optional)

Enable Sink support for bidirectional streams:

[dependencies]
futures-batch = { version = "0.7", features = ["sink"] }

When enabled, both ChunksTimeout and TryChunksTimeout implement Sink and forward sink operations to the underlying stream.

Performance

futures-batch has minimal overhead and is suitable for high-performance applications:

Benchmarks show consistent ~20ns per operation across different batch sizes.

Credits

Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!

About

An adapter for futures, which chunks up elements and flushes them after a timeout — or when the buffer is full. (Formerly known as tokio-batch.)

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Contributors 9