Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
20a1daf
add macro `cfg_rt_and_time` and `cfg_rt_or_time`
ADD-SP Jul 19, 2025
3e65b07
impl `wheel::Entry`
ADD-SP Jul 13, 2025
7618162
remove global timer wheel
ADD-SP Jul 13, 2025
4b0a866
add local timer wheel to the current_thread scheduler
ADD-SP Jul 13, 2025
78d433f
add local timer wheel to the multi_thread scheduler
ADD-SP Jul 13, 2025
4727a56
remove `tokio/src/runtime/time/entry.rs`
ADD-SP Jul 13, 2025
d7f9c72
impl `tokio/src/runtime/time/timer.rs`
ADD-SP Jul 13, 2025
161ca91
adapt the impl of `tokio::time::sleep`
ADD-SP Jul 13, 2025
09b27cf
adapt the impl of `tokio::time::interval`
ADD-SP Jul 13, 2025
58a13e3
let current_thread scheduler process timers
ADD-SP Jul 13, 2025
8407dd6
let multi_thread scheduler process timers
ADD-SP Jul 13, 2025
89e132f
remove the `InsertError`
ADD-SP Jul 17, 2025
9b336b1
fix `tokio-util/tests/time_delay_queue.rs`
ADD-SP Jul 19, 2025
cf7211a
fix unused import `wake_list::WakeList`
ADD-SP Jul 19, 2025
8ac5933
re-enable loom tests for timers
ADD-SP Aug 2, 2025
ca49b99
fix race conditions while operating on `EntryHandle`
ADD-SP Aug 4, 2025
fb027a6
cross-thread cancellation queue
ADD-SP Aug 10, 2025
9e294d5
drop all items while dropping the cancellation queue
ADD-SP Aug 14, 2025
abffd46
switch back to general intrusive node based MPSC
ADD-SP Aug 15, 2025
ccd0ae8
relax the memory ordering
ADD-SP Aug 16, 2025
f91caf7
switch to `Mutex` version of cancellation queue
ADD-SP Aug 15, 2025
88292d8
avoid `noalias` and `drop_in_place`
ADD-SP Aug 16, 2025
bb932b0
remove legacy works in `spellcheck.dic`
ADD-SP Aug 17, 2025
4b15736
reuse the existing intrusice list implementation
ADD-SP Aug 24, 2025
e89259e
fix docstring issue caused by renaming `Entry::cancel_pointer` to `En…
ADD-SP Aug 24, 2025
5eee490
reuse existing intrusive list impl using a ZST
ADD-SP Aug 27, 2025
6626ad0
chore(spellcheck.dic): add 'ZST'
ADD-SP Aug 27, 2025
65276ac
fix memory leakage of cancellation queue
ADD-SP Aug 28, 2025
e4b1e68
remove useless `unsafe impl Send for Entry`
ADD-SP Aug 28, 2025
009d158
clarify the meaning of `STATE_BUSY_REGISTERING`
ADD-SP Aug 28, 2025
c1e0522
merge: sync changes from the base branch
ADD-SP Sep 1, 2025
0f3c9ec
eliminate atomic state by `Mutex<State>` of `Entry`
ADD-SP Sep 1, 2025
6b8eed5
fixup! eliminate atomic state by `Mutex<State>` of `Entry`
ADD-SP Sep 2, 2025
6bb207d
reduce lock contention while drainning the remote timers
ADD-SP Sep 2, 2025
0e9a7e1
merge: sync changes from the base branch
ADD-SP Sep 3, 2025
93ebb38
merge: sync changes from the base branch
ADD-SP Sep 7, 2025
6a4f048
merge: sync changes from the base branch
ADD-SP Sep 8, 2025
2385b3d
improve the cache locality while checking the shutdown flag
ADD-SP Sep 11, 2025
782bcba
merge: sync changes from the base branch
ADD-SP Sep 11, 2025
2fcfb4b
remove unnecessary `PhantomData` in cancellation queue
ADD-SP Sep 11, 2025
9657e6d
fix rustfmt reports
ADD-SP Sep 11, 2025
1fff994
merge: sync changes from the base branch
ADD-SP Sep 15, 2025
c208c3a
ci: fix spellcheck.dic
ADD-SP Sep 15, 2025
fda223d
merge: sync changes from the base branch
ADD-SP Sep 16, 2025
070ddd6
merge: sync changes from the base branch
ADD-SP Sep 16, 2025
102e2d2
merge: sync changes from the base branch
ADD-SP Sep 17, 2025
5d17c33
merge: sync changes from the base branch
ADD-SP Sep 18, 2025
a17bdde
merge: sync changes from the base branch
ADD-SP Sep 19, 2025
40cc384
improve the ergonomic of `Handle::with_wheel()`
ADD-SP Sep 21, 2025
36e71a0
merge: sync changes from the base branch
ADD-SP Sep 21, 2025
a5c384d
merge: sync changes from the base branch
ADD-SP Sep 25, 2025
ebd1df1
merge: sync changes from the base branch
ADD-SP Sep 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture runtime::time::tests
run: cargo test --lib --release --features full -- --nocapture runtime::time
working-directory: tokio

loom-current-thread:
Expand Down
4 changes: 3 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
307
309
&
+
<
Expand Down Expand Up @@ -163,6 +163,7 @@ Lauck
libc
lifecycle
lifo
LLVM
lookups
macOS
MacOS
Expand Down Expand Up @@ -306,3 +307,4 @@ Wakers
wakeup
wakeups
workstealing
ZST
88 changes: 3 additions & 85 deletions tokio-util/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![cfg(feature = "full")]

use futures::StreamExt;
use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio::time::{self, sleep, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;

Expand Down Expand Up @@ -82,8 +82,6 @@ async fn single_short_delay() {

sleep(ms(5)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");

Expand Down Expand Up @@ -221,7 +219,7 @@ async fn reset_much_later() {

sleep(ms(20)).await;

assert!(queue.is_woken());
assert_ready_some!(poll!(queue));
Copy link
Member Author

@ADD-SP ADD-SP Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

queue.reset_at resets inner Sleep, however, the new implementation will drop the inner timer and create a new one. So the waker will not be called, we have to poll manually.

}

// Reproduces tokio-rs/tokio#849.
Expand All @@ -248,7 +246,7 @@ async fn reset_twice() {

sleep(ms(20)).await;

assert!(queue.is_woken());
assert_ready_some!(poll!(queue));
Copy link
Member Author

@ADD-SP ADD-SP Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

queue.reset_at resets inner Sleep, however, the new implementation will drop the inner timer and create a new one. So the waker will not be called, we have to poll manually.

}

/// Regression test: Given an entry inserted with a deadline in the past, so
Expand Down Expand Up @@ -412,8 +410,6 @@ async fn expire_first_key_when_reset_to_expire_earlier() {

sleep(ms(100)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "one");
}
Expand All @@ -435,8 +431,6 @@ async fn expire_second_key_when_reset_to_expire_earlier() {

sleep(ms(100)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}
Expand All @@ -457,8 +451,6 @@ async fn reset_first_expiring_item_to_expire_later() {
queue.reset_at(&one, now + ms(300));
sleep(ms(250)).await;

assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}
Expand Down Expand Up @@ -522,43 +514,6 @@ async fn insert_after_ready_poll() {
assert_eq!("3", res[2]);
}

#[tokio::test]
async fn reset_later_after_slot_starts() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

This test was removed because it was testing the behavior of tokio::runtime::time::entry::reset, which is removed in this PR.

time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let foo = queue.insert_at("foo", now + ms(100));

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(80)).await;

assert!(!queue.is_woken());

// At this point the queue hasn't been polled, so `elapsed` on the wheel
// for the queue is still at 0 and hence the 1ms resolution slots cover
// [0-64). Resetting the time on the entry to 120 causes it to get put in
// the [64-128) slot. As the queue knows that the first entry is within
// that slot, but doesn't know when, it must wake immediately to advance
// the wheel.
queue.reset_at(&foo, now + ms(120));
assert!(queue.is_woken());

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());

sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

#[tokio::test]
async fn reset_inserted_expired() {
time::pause();
Expand All @@ -584,43 +539,6 @@ async fn reset_inserted_expired() {
assert_eq!(queue.len(), 0);
}

#[tokio::test]
async fn reset_earlier_after_slot_starts() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

This test was removed because it was testing the behavior of tokio::runtime::time::entry::reset, which is removed in this PR.

time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let foo = queue.insert_at("foo", now + ms(200));

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(80)).await;

assert!(!queue.is_woken());

// At this point the queue hasn't been polled, so `elapsed` on the wheel
// for the queue is still at 0 and hence the 1ms resolution slots cover
// [0-64). Resetting the time on the entry to 120 causes it to get put in
// the [64-128) slot. As the queue knows that the first entry is within
// that slot, but doesn't know when, it must wake immediately to advance
// the wheel.
queue.reset_at(&foo, now + ms(120));
assert!(queue.is_woken());

assert_pending!(poll!(queue));

sleep_until(now + Duration::from_millis(119)).await;
assert!(!queue.is_woken());

sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

#[tokio::test]
async fn insert_in_past_after_poll_fires_immediately() {
time::pause();
Expand Down
24 changes: 24 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,3 +711,27 @@ macro_rules! cfg_io_uring {
)*
};
}

macro_rules! cfg_rt_and_time{
($($item:item)*) => {
$(
#[cfg(all(
feature = "rt",
feature = "time",
))]
$item
)*
};
}

macro_rules! cfg_rt_or_time{
($($item:item)*) => {
$(
#[cfg(any(
feature = "rt",
feature = "time",
))]
$item
)*
};
}
7 changes: 7 additions & 0 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl Handle {
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}

pub(crate) fn with_time<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<&crate::runtime::time::Handle>) -> R,
{
f(self.time.as_ref())
}

pub(crate) fn clock(&self) -> &Clock {
&self.clock
}
Expand Down
Loading
Loading