diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 5efa0aca74b..3cbcfaa2515 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -52,7 +52,7 @@ jobs: toolchain: ${{ env.rust_stable }} - uses: Swatinem/rust-cache@v2 - name: run tests - run: cargo test --lib --release --features full -- --nocapture runtime::time::tests + run: cargo test --lib --release --features full -- --nocapture runtime::time working-directory: tokio loom-current-thread: diff --git a/spellcheck.dic b/spellcheck.dic index 3182d65ad44..90cbfa96f8e 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -307 +309 & + < @@ -163,6 +163,7 @@ Lauck libc lifecycle lifo +LLVM lookups macOS MacOS @@ -306,3 +307,4 @@ Wakers wakeup wakeups workstealing +ZST diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index fdd0844c8c3..dcbf2d0e7c9 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -3,7 +3,7 @@ #![cfg(feature = "full")] use futures::StreamExt; -use tokio::time::{self, sleep, sleep_until, Duration, Instant}; +use tokio::time::{self, sleep, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; @@ -82,8 +82,6 @@ async fn single_short_delay() { sleep(ms(5)).await; - assert!(queue.is_woken()); - let entry = assert_ready_some!(poll!(queue)); assert_eq!(*entry.get_ref(), "foo"); @@ -221,7 +219,7 @@ async fn reset_much_later() { sleep(ms(20)).await; - assert!(queue.is_woken()); + assert_ready_some!(poll!(queue)); } // Reproduces tokio-rs/tokio#849. @@ -248,7 +246,7 @@ async fn reset_twice() { sleep(ms(20)).await; - assert!(queue.is_woken()); + assert_ready_some!(poll!(queue)); } /// Regression test: Given an entry inserted with a deadline in the past, so @@ -412,8 +410,6 @@ async fn expire_first_key_when_reset_to_expire_earlier() { sleep(ms(100)).await; - assert!(queue.is_woken()); - let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "one"); } @@ -435,8 +431,6 @@ async fn expire_second_key_when_reset_to_expire_earlier() { sleep(ms(100)).await; - assert!(queue.is_woken()); - let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "two"); } @@ -457,8 +451,6 @@ async fn reset_first_expiring_item_to_expire_later() { queue.reset_at(&one, now + ms(300)); sleep(ms(250)).await; - assert!(queue.is_woken()); - let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "two"); } @@ -522,43 +514,6 @@ async fn insert_after_ready_poll() { assert_eq!("3", res[2]); } -#[tokio::test] -async fn reset_later_after_slot_starts() { - time::pause(); - - let mut queue = task::spawn(DelayQueue::new()); - - let now = Instant::now(); - - let foo = queue.insert_at("foo", now + ms(100)); - - assert_pending!(poll!(queue)); - - sleep_until(now + Duration::from_millis(80)).await; - - assert!(!queue.is_woken()); - - // At this point the queue hasn't been polled, so `elapsed` on the wheel - // for the queue is still at 0 and hence the 1ms resolution slots cover - // [0-64). Resetting the time on the entry to 120 causes it to get put in - // the [64-128) slot. As the queue knows that the first entry is within - // that slot, but doesn't know when, it must wake immediately to advance - // the wheel. - queue.reset_at(&foo, now + ms(120)); - assert!(queue.is_woken()); - - assert_pending!(poll!(queue)); - - sleep_until(now + Duration::from_millis(119)).await; - assert!(!queue.is_woken()); - - sleep(ms(1)).await; - assert!(queue.is_woken()); - - let entry = assert_ready_some!(poll!(queue)).into_inner(); - assert_eq!(entry, "foo"); -} - #[tokio::test] async fn reset_inserted_expired() { time::pause(); @@ -584,43 +539,6 @@ async fn reset_inserted_expired() { assert_eq!(queue.len(), 0); } -#[tokio::test] -async fn reset_earlier_after_slot_starts() { - time::pause(); - - let mut queue = task::spawn(DelayQueue::new()); - - let now = Instant::now(); - - let foo = queue.insert_at("foo", now + ms(200)); - - assert_pending!(poll!(queue)); - - sleep_until(now + Duration::from_millis(80)).await; - - assert!(!queue.is_woken()); - - // At this point the queue hasn't been polled, so `elapsed` on the wheel - // for the queue is still at 0 and hence the 1ms resolution slots cover - // [0-64). Resetting the time on the entry to 120 causes it to get put in - // the [64-128) slot. As the queue knows that the first entry is within - // that slot, but doesn't know when, it must wake immediately to advance - // the wheel. - queue.reset_at(&foo, now + ms(120)); - assert!(queue.is_woken()); - - assert_pending!(poll!(queue)); - - sleep_until(now + Duration::from_millis(119)).await; - assert!(!queue.is_woken()); - - sleep(ms(1)).await; - assert!(queue.is_woken()); - - let entry = assert_ready_some!(poll!(queue)).into_inner(); - assert_eq!(entry, "foo"); -} - #[tokio::test] async fn insert_in_past_after_poll_fires_immediately() { time::pause(); diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index f5dfc3814e6..3615cb79948 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -711,3 +711,27 @@ macro_rules! cfg_io_uring { )* }; } + +macro_rules! cfg_rt_and_time{ + ($($item:item)*) => { + $( + #[cfg(all( + feature = "rt", + feature = "time", + ))] + $item + )* + }; +} + +macro_rules! cfg_rt_or_time{ + ($($item:item)*) => { + $( + #[cfg(any( + feature = "rt", + feature = "time", + ))] + $item + )* + }; +} diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index a1a6df8e007..a01e7341785 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -113,6 +113,13 @@ impl Handle { .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") } + pub(crate) fn with_time(&self, f: F) -> R + where + F: FnOnce(Option<&crate::runtime::time::Handle>) -> R, + { + f(self.time.as_ref()) + } + pub(crate) fn clock(&self) -> &Clock { &self.clock } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 2097d34606a..0bb8e513a2d 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::atomic::AtomicBool; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; use crate::runtime::scheduler::{self, Defer, Inject}; @@ -22,6 +22,12 @@ use std::thread::ThreadId; use std::time::Duration; use std::{fmt, thread}; +cfg_time! { + use crate::runtime::scheduler::util; + use crate::runtime::time::{EntryHandle, Wheel, cancellation_queue}; + use crate::loom::sync::Mutex; +} + /// Executes tasks on the current thread pub(crate) struct CurrentThread { /// Core scheduler data is acquired by a thread entering `block_on`. @@ -62,6 +68,18 @@ struct Core { /// Current tick tick: u32, + #[cfg(feature = "time")] + /// Worker local timer wheel + wheel: Wheel, + + #[cfg(feature = "time")] + /// Channel for sending timers that need to be cancelled + timer_cancel_tx: cancellation_queue::Sender, + + #[cfg(feature = "time")] + /// Channel for receiving timers that need to be cancelled + timer_cancel_rx: cancellation_queue::Receiver, + /// Runtime driver /// /// The driver is removed before starting to park the thread @@ -83,6 +101,12 @@ struct Shared { /// Remote run queue inject: Inject>, + #[cfg(feature = "time")] + /// Timers pending to be registered. + /// This is used to register a timer but the [`Core`] + /// is not available in the current thread. + inject_timers: Mutex>, + /// Collection of all active tasks spawned onto this executor. owned: OwnedTasks>, @@ -97,6 +121,9 @@ struct Shared { /// This scheduler only has one worker. worker_metrics: WorkerMetrics, + + /// Indicates that the runtime is shutting down. + is_shutdown: AtomicBool, } /// Thread-local context. @@ -152,11 +179,14 @@ impl CurrentThread { }, shared: Shared { inject: Inject::new(), + #[cfg(feature = "time")] + inject_timers: Mutex::new(Vec::new()), owned: OwnedTasks::new(1), woken: AtomicBool::new(false), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics, + is_shutdown: AtomicBool::new(false), }, driver: driver_handle, blocking_spawner, @@ -164,9 +194,17 @@ impl CurrentThread { local_tid, }); + #[cfg(feature = "time")] + let (timer_cancel_tx, timer_cancel_rx) = cancellation_queue::new(); let core = AtomicCell::new(Some(Box::new(Core { tasks: VecDeque::with_capacity(INITIAL_CAPACITY), tick: 0, + #[cfg(feature = "time")] + wheel: Wheel::new(), + #[cfg(feature = "time")] + timer_cancel_tx, + #[cfg(feature = "time")] + timer_cancel_rx, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), global_queue_interval, @@ -266,6 +304,8 @@ impl CurrentThread { let core = shutdown2(core, handle); *context.core.borrow_mut() = Some(core); } + + handle.shared.is_shutdown.store(true, Ordering::SeqCst); } } @@ -275,7 +315,16 @@ fn shutdown2(mut core: Box, handle: &Handle) -> Box { // call returns. handle.shared.owned.close_and_shutdown_all(0); - // Drain local queue + #[cfg(feature = "time")] + util::time::shutdown_local_timers( + &mut core.wheel, + &core.timer_cancel_tx, + &mut core.timer_cancel_rx, + handle.take_remote_timers(), + &handle.driver, + ); + + // Drain the local queue // We already shut down every task, so we just need to drop the task. while let Some(task) = core.next_local_task(handle) { drop(task); @@ -387,12 +436,7 @@ impl Context { core.metrics.about_to_park(); core.submit_metrics(handle); - let (c, ()) = self.enter(core, || { - driver.park(&handle.driver); - self.defer.wake(); - }); - - core = c; + core = self.park_internal(core, handle, &mut driver, None); core.metrics.unparked(); core.submit_metrics(handle); @@ -413,15 +457,81 @@ impl Context { core.submit_metrics(handle); - let (mut core, ()) = self.enter(core, || { - driver.park_timeout(&handle.driver, Duration::from_millis(0)); - self.defer.wake(); - }); + core = self.park_internal(core, handle, &mut driver, Some(Duration::from_millis(0))); core.driver = Some(driver); core } + fn park_internal( + &self, + core: Box, + handle: &Handle, + driver: &mut Driver, + duration: Option, + ) -> Box { + debug_assert!(core.driver.is_none()); + + #[cfg(feature = "time")] + let (core, duration, maybe_advance_duration) = { + // declare as mutable to avoid compiler warning, + // otherwise the compiler will complain that the `core` parameter does not need to be mutable + // if the 'time' feature is not enabled. + let mut core = core; + util::time::remove_cancelled_timers(&mut core.wheel, &mut core.timer_cancel_rx); + let should_yield = util::time::insert_inject_timers( + &mut core.wheel, + &core.timer_cancel_tx, + handle.take_remote_timers(), + ); + let next_timer = util::time::next_expiration_time(&core.wheel, &handle.driver); + if should_yield { + (core, Some(Duration::from_millis(0)), None) + } else { + let dur = match (next_timer, duration) { + (Some(next_timer), Some(park_duration)) => Some(next_timer.min(park_duration)), + (Some(next_timer), None) => Some(next_timer), + (None, Some(park_duration)) => Some(park_duration), + (None, None) => None, + }; + if util::time::pre_auto_advance(&handle.driver, dur) { + (core, Some(Duration::ZERO), dur) + } else { + (core, dur, None) + } + } + }; + + let (core, ()) = self.enter(core, || { + if let Some(duration) = duration { + driver.park_timeout(&handle.driver, duration); + } else { + driver.park(&handle.driver); + } + }); + + self.defer.wake(); + + #[cfg(feature = "time")] + let core = { + // declare as mutable to avoid compiler warning + // error: variable does not need to be mutable + // --> tokio/src/runtime/scheduler/current_thread/mod.rs:497:14 + // | + // 497 | let (mut core, ()) = self.enter(core, || { + // | ----^^^^ + // | | + // | help: remove this `mut` + // | + let mut core = core; + util::time::post_auto_advance(&handle.driver, maybe_advance_duration); + util::time::process_expired_timers(&mut core.wheel, &handle.driver); + core + }; + + core + } + fn enter(&self, core: Box, f: impl FnOnce() -> R) -> (Box, R) { // Store the scheduler core in the thread-local context // @@ -439,6 +549,31 @@ impl Context { pub(crate) fn defer(&self, waker: &Waker) { self.defer.defer(waker); } + + cfg_time! { + fn with_core(&self, f: F) -> R + where + F: FnOnce(Option<&mut Core>) -> R, + { + let mut core = self.core.borrow_mut(); + f(core.as_mut().map(|c| c.as_mut())) + } + + pub(crate) fn with_wheel(&self, f: F) -> R + where + F: FnOnce(Option>) -> R, + { + self.with_core(|maybe_core| { + match maybe_core { + Some(core) => f(Some(crate::runtime::time::Context::Running { + wheel: &mut core.wheel, + canc_tx: &core.timer_cancel_tx, + })), + None => f(None), + } + }) + } + } } // ===== impl Handle ===== @@ -584,6 +719,26 @@ impl Handle { assert_eq!(0, worker); &self.shared.worker_metrics } + + cfg_time! { + /// Push a timer handle from the remote thread. + pub(crate) fn push_remote_timer(&self, entry: EntryHandle) { + { + let mut inject_timers = self.shared.inject_timers.lock(); + inject_timers.push(entry); + } + self.driver.unpark(); + } + + pub(crate) fn take_remote_timers(&self) -> Vec { + let mut inject_timers = self.shared.inject_timers.lock(); + std::mem::take(&mut inject_timers) + } + + pub(crate) fn is_shutdown(&self) -> bool { + self.shared.is_shutdown.load(Ordering::SeqCst) + } + } } cfg_unstable_metrics! { @@ -646,10 +801,18 @@ impl Schedule for Arc { Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => { let mut core = cx.core.borrow_mut(); - // If `None`, the runtime is shutting down, so there is no need - // to schedule the task. if let Some(core) = core.as_mut() { core.push_task(self, task); + } else { + // runtime is shutting down + // OR waking up expired timers + + // Track that a task was scheduled from **outside** of the runtime. + self.shared.scheduler_metrics.inc_remote_schedule_count(); + + // Schedule the task + self.shared.inject.push(task); + self.driver.unpark(); } } _ => { diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index ecd56aeee10..d0b36f893d2 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -24,6 +24,8 @@ cfg_rt_multi_thread! { pub(crate) use multi_thread::MultiThread; } +mod util; + use crate::runtime::driver; #[derive(Debug, Clone)] diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 9acfcb270d6..7c74ea007cc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -1,4 +1,5 @@ use crate::future::Future; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::Arc; use crate::runtime::scheduler::multi_thread::worker; use crate::runtime::task::{Notified, Task, TaskHarnessScheduleHooks}; @@ -33,6 +34,9 @@ pub(crate) struct Handle { /// User-supplied hooks to invoke for things pub(crate) task_hooks: TaskHooks, + + /// Indicates that the runtime is shutting down. + pub(crate) is_shutdown: AtomicBool, } impl Handle { @@ -50,7 +54,14 @@ impl Handle { Self::bind_new_task(me, future, id, spawned_at) } + cfg_time! { + pub(crate) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } + } + pub(crate) fn shutdown(&self) { + self.is_shutdown.store(true, Ordering::SeqCst); self.close(); } diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index b00c648e6d3..fab35ce8283 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -71,11 +71,8 @@ impl Parker { } pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { - // Only parking with zero is supported... - assert_eq!(duration, Duration::from_millis(0)); - if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(handle, duration); + self.inner.park_driver(&mut driver, handle, Some(duration)); } else { // https://github.com/tokio-rs/tokio/issues/6536 // Hacky, but it's just for loom tests. The counter gets incremented during @@ -124,7 +121,7 @@ impl Inner { } if let Some(mut driver) = self.shared.driver.try_lock() { - self.park_driver(&mut driver, handle); + self.park_driver(&mut driver, handle, None); } else { self.park_condvar(); } @@ -170,7 +167,19 @@ impl Inner { } } - fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) { + fn park_driver( + &self, + driver: &mut Driver, + handle: &driver::Handle, + duration: Option, + ) { + if duration.as_ref().is_some_and(Duration::is_zero) { + // zero duration doesn't actually park the thread, it just + // polls the I/O events, timers, etc. + driver.park_timeout(handle, Duration::ZERO); + return; + } + match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -191,7 +200,12 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {actual}"), } - driver.park(handle); + if let Some(duration) = duration { + debug_assert_ne!(duration, Duration::ZERO); + driver.park_timeout(handle, duration); + } else { + driver.park(handle); + } match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7ec3f126467..b144621b129 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -56,6 +56,7 @@ //! the inject queue indefinitely. This would be a ref-count cycle and a memory //! leak. +use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::scheduler::multi_thread::{ @@ -74,6 +75,11 @@ use std::task::Waker; use std::thread; use std::time::Duration; +cfg_time! { + use crate::runtime::scheduler::util; + use crate::runtime::time::{EntryHandle, Wheel, cancellation_queue}; +} + mod metrics; cfg_taskdump! { @@ -115,6 +121,18 @@ struct Core { /// The worker-local run queue. run_queue: queue::Local>, + #[cfg(feature = "time")] + /// Worker local timer wheel + wheel: Wheel, + + #[cfg(feature = "time")] + /// Channel for sending timers that need to be cancelled + timer_cancel_tx: cancellation_queue::Sender, + + #[cfg(feature = "time")] + /// Channel for receiving timers that need to be cancelled + timer_cancel_rx: cancellation_queue::Receiver, + /// True if the worker is currently searching for more work. Searching /// involves attempting to steal from other workers. is_searching: bool, @@ -193,6 +211,12 @@ pub(crate) struct Synced { /// Synchronized state for `Inject`. pub(crate) inject: inject::Synced, + + #[cfg(feature = "time")] + /// Timers pending to be registered. + /// This is used to register a timer but the [`Core`] + /// is not available in the current thread. + inject_timers: Vec, } /// Used to communicate with a worker from other threads. @@ -254,12 +278,20 @@ pub(super) fn create( let unpark = park.unpark(); let metrics = WorkerMetrics::from_config(&config); let stats = Stats::new(&metrics); + #[cfg(feature = "time")] + let (timer_cancel_tx, timer_cancel_rx) = cancellation_queue::new(); cores.push(Box::new(Core { tick: 0, lifo_slot: None, lifo_enabled: !config.disable_lifo_slot, run_queue, + #[cfg(feature = "time")] + wheel: Wheel::new(), + #[cfg(feature = "time")] + timer_cancel_tx, + #[cfg(feature = "time")] + timer_cancel_rx, is_searching: false, is_shutdown: false, is_traced: false, @@ -287,6 +319,8 @@ pub(super) fn create( synced: Mutex::new(Synced { idle: idle_synced, inject: inject_synced, + #[cfg(feature = "time")] + inject_timers: vec![], }), shutdown_cores: Mutex::new(vec![]), trace_status: TraceStatus::new(remotes_len), @@ -298,6 +332,7 @@ pub(super) fn create( driver: driver_handle, blocking_spawner, seed_generator, + is_shutdown: AtomicBool::new(false), }); let mut launch = Launch(vec![]); @@ -552,7 +587,7 @@ impl Context { } else { // Wait for work core = if !self.defer.is_empty() { - self.park_timeout(core, Some(Duration::from_millis(0))) + self.park_yield(core) } else { self.park(core) }; @@ -560,6 +595,15 @@ impl Context { } } + #[cfg(feature = "time")] + util::time::shutdown_local_timers( + &mut core.wheel, + &core.timer_cancel_tx, + &mut core.timer_cancel_rx, + self.worker.handle.take_remote_timers(), + &self.worker.handle.driver, + ); + core.pre_shutdown(&self.worker); // Signal shutdown self.worker.handle.shutdown_core(core); @@ -701,7 +745,7 @@ impl Context { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. - core = self.park_timeout(core, Some(Duration::from_millis(0))); + core = self.park_yield(core); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -734,7 +778,7 @@ impl Context { core.stats .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]); - core = self.park_timeout(core, None); + core = self.park_internal(core, None); core.stats.unparked(); @@ -753,9 +797,41 @@ impl Context { core } - fn park_timeout(&self, mut core: Box, duration: Option) -> Box { + fn park_yield(&self, core: Box) -> Box { + self.park_internal(core, Some(Duration::from_millis(0))) + } + + fn park_internal(&self, mut core: Box, duration: Option) -> Box { self.assert_lifo_enabled_is_correct(&core); + #[cfg(feature = "time")] + let (duration, maybe_advance_duration) = { + let handle = &self.worker.handle; + + util::time::remove_cancelled_timers(&mut core.wheel, &mut core.timer_cancel_rx); + let should_yield = util::time::insert_inject_timers( + &mut core.wheel, + &core.timer_cancel_tx, + handle.take_remote_timers(), + ); + let next_timer = util::time::next_expiration_time(&core.wheel, &handle.driver); + if should_yield { + (Some(Duration::from_millis(0)), None) + } else { + let dur = match (next_timer, duration) { + (Some(next_timer), Some(park_duration)) => Some(next_timer.min(park_duration)), + (Some(next_timer), None) => Some(next_timer), + (None, Some(park_duration)) => Some(park_duration), + (None, None) => None, + }; + if util::time::pre_auto_advance(&handle.driver, dur) { + (Some(Duration::ZERO), dur) + } else { + (dur, None) + } + } + }; + // Take the parker out of core let mut park = core.park.take().expect("park missing"); @@ -774,6 +850,13 @@ impl Context { // Remove `core` from context core = self.core.borrow_mut().take().expect("core missing"); + #[cfg(feature = "time")] + { + let handle = &self.worker.handle; + util::time::post_auto_advance(&handle.driver, maybe_advance_duration); + util::time::process_expired_timers(&mut core.wheel, &handle.driver); + } + // Place `park` back in `core` core.park = Some(park); @@ -793,6 +876,34 @@ impl Context { self.defer.defer(waker); } } + + cfg_time! { + fn with_core(&self, f: F) -> R + where + F: FnOnce(Option<&mut Core>) -> R, + { + match self.core.borrow_mut().as_mut() { + Some(core) => f(Some(core)), + None => f(None), + } + } + + pub(crate) fn with_wheel(&self, f: F) -> R + where + F: FnOnce(Option>) -> R, + { + self.with_core(|maybe_core| { + match maybe_core { + Some(core) if core.is_shutdown => f(Some(crate::runtime::time::Context::Shutdown)), + Some(core) => f(Some(crate::runtime::time::Context::Running { + wheel: &mut core.wheel, + canc_tx: &core.timer_cancel_tx, + })), + None => f(None), + } + }) + } + } } impl Core { @@ -1131,6 +1242,26 @@ impl Handle { } } + cfg_time! { + /// Push a timer handle from the remote thread. + pub(crate) fn push_remote_timer(&self, hdl: EntryHandle) { + { + let mut synced = self.shared.synced.lock(); + synced.inject_timers.push(hdl); + } + self.notify_parked_remote(); + } + + pub(crate) fn take_remote_timers(&self) -> Vec { + // It's ok to lost the race, as another worker is + // draining the inject_timers. + match self.shared.synced.try_lock() { + Some(mut synced) => std::mem::take(&mut synced.inject_timers), + None => Vec::new(), + } + } + } + pub(super) fn close(&self) { if self .shared diff --git a/tokio/src/runtime/scheduler/util.rs b/tokio/src/runtime/scheduler/util.rs new file mode 100644 index 00000000000..54c9de4a9f6 --- /dev/null +++ b/tokio/src/runtime/scheduler/util.rs @@ -0,0 +1,164 @@ +cfg_rt_and_time! { + pub(crate) mod time { + use crate::runtime::{scheduler::driver}; + use crate::runtime::time::{EntryHandle, Wheel, cancellation_queue::{Sender, Receiver}}; + use std::time::Duration; + + pub(crate) fn insert_inject_timers( + wheel: &mut Wheel, + tx: &Sender, + inject: Vec, + ) -> bool { + use crate::runtime::time::Insert; + let mut fired = false; + // process injected timers + for hdl in inject { + match unsafe { wheel.insert(hdl.clone(), tx.clone()) } { + Insert::Success => {} + Insert::Elapsed => { + hdl.wake_unregistered(); + fired = true; + } + Insert::Cancelling => {} + } + } + + fired + } + + pub(crate) fn remove_cancelled_timers( + wheel: &mut Wheel, + rx: &mut Receiver, + ) { + for hdl in rx.recv_all() { + unsafe { + let is_registered = hdl.is_registered(); + let is_pending = hdl.is_pending(); + if is_registered && !is_pending { + wheel.remove(hdl); + } + } + } + } + + pub(crate) fn next_expiration_time( + wheel: &Wheel, + drv_hdl: &driver::Handle, + ) -> Option { + drv_hdl.with_time(|maybe_time_hdl| { + let Some(time_hdl) = maybe_time_hdl else { + // time driver is not enabled, nothing to do. + return None; + }; + + let clock = drv_hdl.clock(); + let time_source = time_hdl.time_source(); + + wheel.next_expiration_time().map(|tick| { + let now = time_source.now(clock); + time_source.tick_to_duration(tick.saturating_sub(now)) + }) + }) + } + + cfg_test_util! { + pub(crate) fn pre_auto_advance( + drv_hdl: &driver::Handle, + duration: Option, + ) -> bool { + drv_hdl.with_time(|maybe_time_hdl| { + if maybe_time_hdl.is_none() { + // time driver is not enabled, nothing to do. + return false; + } + + if duration.is_some() { + let clock = drv_hdl.clock(); + if clock.can_auto_advance() { + return true; + } + + false + } else { + false + } + }) + } + + pub(crate) fn post_auto_advance( + drv_hdl: &driver::Handle, + duration: Option, + ) { + drv_hdl.with_time(|maybe_time_hdl| { + let Some(time_hdl) = maybe_time_hdl else { + // time driver is not enabled, nothing to do. + return; + }; + + if let Some(park_duration) = duration { + let clock = drv_hdl.clock(); + if clock.can_auto_advance() + && !time_hdl.did_wake() { + if let Err(msg) = clock.advance(park_duration) { + panic!("{msg}"); + } + } + } + }) + } + } + + cfg_not_test_util! { + pub(crate) fn pre_auto_advance( + _drv_hdl: &driver::Handle, + _duration: Option, + ) -> bool { + false + } + + pub(crate) fn post_auto_advance( + _drv_hdl: &driver::Handle, + _duration: Option, + ) { + // No-op in non-test util builds + } + } + + pub(crate) fn process_expired_timers( + wheel: &mut Wheel, + drv_hdl: &driver::Handle, + ) { + drv_hdl.with_time(|maybe_time_hdl| { + let Some(time_hdl) = maybe_time_hdl else { + // time driver is not enabled, nothing to do. + return; + }; + + let clock = drv_hdl.clock(); + let time_source = time_hdl.time_source(); + + let now = time_source.now(clock); + time_hdl.process_at_time(wheel, now); + }); + } + + pub(crate) fn shutdown_local_timers( + wheel: &mut Wheel, + tx: &Sender, + rx: &mut Receiver, + inject: Vec, + drv_hdl: &driver::Handle, + ) { + drv_hdl.with_time(|maybe_time_hdl| { + let Some(time_hdl) = maybe_time_hdl else { + // time driver is not enabled, nothing to do. + return; + }; + + remove_cancelled_timers(wheel, rx); + insert_inject_timers(wheel, tx, inject); + time_hdl.shutdown(wheel); + }); + } + } +} diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs deleted file mode 100644 index 627fcbc5ec3..00000000000 --- a/tokio/src/runtime/time/entry.rs +++ /dev/null @@ -1,687 +0,0 @@ -//! Timer state structures. -//! -//! This module contains the heart of the intrusive timer implementation, and as -//! such the structures inside are full of tricky concurrency and unsafe code. -//! -//! # Ground rules -//! -//! The heart of the timer implementation here is the [`TimerShared`] structure, -//! shared between the [`TimerEntry`] and the driver. Generally, we permit access -//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or -//! 2) a held driver lock. -//! -//! It follows from this that any changes made while holding BOTH 1 and 2 will -//! be reliably visible, regardless of ordering. This is because of the `acq/rel` -//! fences on the driver lock ensuring ordering with 2, and rust mutable -//! reference rules for 1 (a mutable reference to an object can't be passed -//! between threads without an `acq/rel` barrier, and same-thread we have local -//! happens-before ordering). -//! -//! # State field -//! -//! Each timer has a state field associated with it. This field contains either -//! the current scheduled time, or a special flag value indicating its state. -//! This state can either indicate that the timer is on the 'pending' queue (and -//! thus will be fired with an `Ok(())` result soon) or that it has already been -//! fired/deregistered. -//! -//! This single state field allows for code that is firing the timer to -//! synchronize with any racing `reset` calls reliably. -//! -//! # Registered vs true timeouts -//! -//! To allow for the use case of a timeout that is periodically reset before -//! expiration to be as lightweight as possible, we support optimistically -//! lock-free timer resets, in the case where a timer is rescheduled to a later -//! point than it was originally scheduled for. -//! -//! This is accomplished by lazily rescheduling timers. That is, we update the -//! state field with the true expiration of the timer from the holder of -//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's -//! walking lists of timers), it checks this "true when" value, and reschedules -//! based on it. -//! -//! We do, however, also need to track what the expiration time was when we -//! originally registered the timer; this is used to locate the right linked -//! list when the timer is being cancelled. -//! This is referred to as the `registered_when` internally. -//! -//! There is of course a race condition between timer reset and timer -//! expiration. If the driver fails to observe the updated expiration time, it -//! could trigger expiration of the timer too early. However, because -//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and -//! refuse to mark the timer as pending. -//! -//! [mark_pending]: TimerHandle::mark_pending - -use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::AtomicU64; -use crate::loom::sync::atomic::Ordering; - -use crate::runtime::scheduler; -use crate::sync::AtomicWaker; -use crate::time::Instant; -use crate::util::linked_list; - -use pin_project_lite::pin_project; -use std::task::{Context, Poll, Waker}; -use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; - -type TimerResult = Result<(), crate::time::error::Error>; - -pub(in crate::runtime::time) const STATE_DEREGISTERED: u64 = u64::MAX; -const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; -const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; -/// The largest safe integer to use for ticks. -/// -/// This value should be updated if any other signal values are added above. -pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1; - -/// This structure holds the current shared state of the timer - its scheduled -/// time (if registered), or otherwise the result of the timer completing, as -/// well as the registered waker. -/// -/// Generally, the `StateCell` is only permitted to be accessed from two contexts: -/// Either a thread holding the corresponding `&mut TimerEntry`, or a thread -/// holding the timer driver lock. The write actions on the `StateCell` amount to -/// passing "ownership" of the `StateCell` between these contexts; moving a timer -/// from the `TimerEntry` to the driver requires _both_ holding the `&mut -/// TimerEntry` and the driver lock, while moving it back (firing the timer) -/// requires only the driver lock. -pub(super) struct StateCell { - /// Holds either the scheduled expiration time for this timer, or (if the - /// timer has been fired and is unregistered), `u64::MAX`. - state: AtomicU64, - /// If the timer is fired (an Acquire order read on state shows - /// `u64::MAX`), holds the result that should be returned from - /// polling the timer. Otherwise, the contents are unspecified and reading - /// without holding the driver lock is undefined behavior. - result: UnsafeCell, - /// The currently-registered waker - waker: AtomicWaker, -} - -impl Default for StateCell { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for StateCell { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "StateCell({:?})", self.read_state()) - } -} - -impl StateCell { - fn new() -> Self { - Self { - state: AtomicU64::new(STATE_DEREGISTERED), - result: UnsafeCell::new(Ok(())), - waker: AtomicWaker::new(), - } - } - - fn is_pending(&self) -> bool { - self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE - } - - /// Returns the current expiration time, or None if not currently scheduled. - fn when(&self) -> Option { - let cur_state = self.state.load(Ordering::Relaxed); - - if cur_state == STATE_DEREGISTERED { - None - } else { - Some(cur_state) - } - } - - /// If the timer is completed, returns the result of the timer. Otherwise, - /// returns None and registers the waker. - fn poll(&self, waker: &Waker) -> Poll { - // We must register first. This ensures that either `fire` will - // observe the new waker, or we will observe a racing fire to have set - // the state, or both. - self.waker.register_by_ref(waker); - - self.read_state() - } - - fn read_state(&self) -> Poll { - let cur_state = self.state.load(Ordering::Acquire); - - if cur_state == STATE_DEREGISTERED { - // SAFETY: The driver has fired this timer; this involves writing - // the result, and then writing (with release ordering) the state - // field. - Poll::Ready(unsafe { self.result.with(|p| *p) }) - } else { - Poll::Pending - } - } - - /// Marks this timer as being moved to the pending list, if its scheduled - /// time is not after `not_after`. - /// - /// If the timer is scheduled for a time after `not_after`, returns an Err - /// containing the current scheduled time. - /// - /// SAFETY: Must hold the driver lock. - unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { - // Quick initial debug check to see if the timer is already fired. Since - // firing the timer can only happen with the driver lock held, we know - // we shouldn't be able to "miss" a transition to a fired state, even - // with relaxed ordering. - let mut cur_state = self.state.load(Ordering::Relaxed); - - loop { - // improve the error message for things like - // https://github.com/tokio-rs/tokio/issues/3675 - assert!( - cur_state < STATE_MIN_VALUE, - "mark_pending called when the timer entry is in an invalid state" - ); - - if cur_state > not_after { - break Err(cur_state); - } - - match self.state.compare_exchange_weak( - cur_state, - STATE_PENDING_FIRE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break Ok(()), - Err(actual_state) => cur_state = actual_state, - } - } - } - - /// Fires the timer, setting the result to the provided result. - /// - /// Returns: - /// * `Some(waker)` - if fired and a waker needs to be invoked once the - /// driver lock is released - /// * `None` - if fired and a waker does not need to be invoked, or if - /// already fired - /// - /// SAFETY: The driver lock must be held. - unsafe fn fire(&self, result: TimerResult) -> Option { - // Quick initial check to see if the timer is already fired. Since - // firing the timer can only happen with the driver lock held, we know - // we shouldn't be able to "miss" a transition to a fired state, even - // with relaxed ordering. - let cur_state = self.state.load(Ordering::Relaxed); - if cur_state == STATE_DEREGISTERED { - return None; - } - - // SAFETY: We assume the driver lock is held and the timer is not - // fired, so only the driver is accessing this field. - // - // We perform a release-ordered store to state below, to ensure this - // write is visible before the state update is visible. - unsafe { self.result.with_mut(|p| *p = result) }; - - self.state.store(STATE_DEREGISTERED, Ordering::Release); - - self.waker.take_waker() - } - - /// Marks the timer as registered (poll will return None) and sets the - /// expiration time. - /// - /// While this function is memory-safe, it should only be called from a - /// context holding both `&mut TimerEntry` and the driver lock. - fn set_expiration(&self, timestamp: u64) { - debug_assert!(timestamp < STATE_MIN_VALUE); - - // We can use relaxed ordering because we hold the driver lock and will - // fence when we release the lock. - self.state.store(timestamp, Ordering::Relaxed); - } - - /// Attempts to adjust the timer to a new timestamp. - /// - /// If the timer has already been fired, is pending firing, or the new - /// timestamp is earlier than the old timestamp, (or occasionally - /// spuriously) returns Err without changing the timer's state. In this - /// case, the timer must be deregistered and re-registered. - fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { - let mut prior = self.state.load(Ordering::Relaxed); - loop { - if new_timestamp < prior || prior >= STATE_MIN_VALUE { - return Err(()); - } - - match self.state.compare_exchange_weak( - prior, - new_timestamp, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => return Ok(()), - Err(true_prior) => prior = true_prior, - } - } - } - - /// Returns true if the state of this timer indicates that the timer might - /// be registered with the driver. This check is performed with relaxed - /// ordering, but is conservative - if it returns false, the timer is - /// definitely _not_ registered. - pub(super) fn might_be_registered(&self) -> bool { - self.state.load(Ordering::Relaxed) != STATE_DEREGISTERED - } -} - -pin_project! { - // A timer entry. - // - // This is the handle to a timer that is controlled by the requester of the - // timer. As this participates in intrusive data structures, it must be pinned - // before polling. - #[derive(Debug)] - pub(crate) struct TimerEntry { - // Arc reference to the runtime handle. We can only free the driver after - // deregistering everything from their respective timer wheels. - driver: scheduler::Handle, - // Shared inner structure; this is part of an intrusive linked list, and - // therefore other references can exist to it while mutable references to - // Entry exist. - // - // This is manipulated only under the inner mutex. - #[pin] - inner: Option, - // Deadline for the timer. This is used to register on the first - // poll, as we can't register prior to being pinned. - deadline: Instant, - // Whether the deadline has been registered. - registered: bool, - } - - impl PinnedDrop for TimerEntry { - fn drop(this: Pin<&mut Self>) { - this.cancel(); - } - } -} - -unsafe impl Send for TimerEntry {} -unsafe impl Sync for TimerEntry {} - -/// An `TimerHandle` is the (non-enforced) "unique" pointer from the driver to the -/// timer entry. Generally, at most one `TimerHandle` exists for a timer at a time -/// (enforced by the timer state machine). -/// -/// SAFETY: An `TimerHandle` is essentially a raw pointer, and the usual caveats -/// of pointer safety apply. In particular, `TimerHandle` does not itself enforce -/// that the timer does still exist; however, normally an `TimerHandle` is created -/// immediately before registering the timer, and is consumed when firing the -/// timer, to help minimize mistakes. Still, because `TimerHandle` cannot enforce -/// memory safety, all operations are unsafe. -#[derive(Debug)] -pub(crate) struct TimerHandle { - inner: NonNull, -} - -pub(super) type EntryList = crate::util::linked_list::LinkedList; - -/// The shared state structure of a timer. This structure is shared between the -/// frontend (`Entry`) and driver backend. -/// -/// Note that this structure is located inside the `TimerEntry` structure. -pub(crate) struct TimerShared { - /// A link within the doubly-linked list of timers on a particular level and - /// slot. Valid only if state is equal to Registered. - /// - /// Only accessed under the entry lock. - pointers: linked_list::Pointers, - - /// The time when the [`TimerEntry`] was registered into the Wheel, - /// [`STATE_DEREGISTERED`] means it is not registered. - /// - /// Generally owned by the driver, but is accessed by the entry when not - /// registered. - /// - /// We use relaxed ordering for both loading and storing since this value - /// is only accessed either when holding the driver lock or through mutable - /// references to [`TimerEntry`]. - registered_when: AtomicU64, - - /// Current state. This records whether the timer entry is currently under - /// the ownership of the driver, and if not, its current state (not - /// complete, fired, error, etc). - state: StateCell, - - _p: PhantomPinned, -} - -unsafe impl Send for TimerShared {} -unsafe impl Sync for TimerShared {} - -impl std::fmt::Debug for TimerShared { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TimerShared") - .field( - "registered_when", - &self.registered_when.load(Ordering::Relaxed), - ) - .field("state", &self.state) - .finish() - } -} - -generate_addr_of_methods! { - impl<> TimerShared { - unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { - &self.pointers - } - } -} - -impl TimerShared { - pub(super) fn new() -> Self { - Self { - registered_when: AtomicU64::new(0), - pointers: linked_list::Pointers::new(), - state: StateCell::default(), - _p: PhantomPinned, - } - } - - /// Gets the cached time-of-expiration value. - pub(super) fn registered_when(&self) -> u64 { - // Cached-when is only accessed under the driver lock, so we can use relaxed - self.registered_when.load(Ordering::Relaxed) - } - - /// Gets the true time-of-expiration value, and copies it into the cached - /// time-of-expiration value. - /// - /// SAFETY: Must be called with the driver lock held, and when this entry is - /// not in any timer wheel lists. - pub(super) unsafe fn sync_when(&self) -> u64 { - let true_when = self.true_when(); - - self.registered_when.store(true_when, Ordering::Relaxed); - - true_when - } - - /// Sets the cached time-of-expiration value. - /// - /// SAFETY: Must be called with the driver lock held, and when this entry is - /// not in any timer wheel lists. - unsafe fn set_registered_when(&self, when: u64) { - self.registered_when.store(when, Ordering::Relaxed); - } - - /// Returns the true time-of-expiration value, with relaxed memory ordering. - pub(super) fn true_when(&self) -> u64 { - self.state.when().expect("Timer already fired") - } - - /// Sets the true time-of-expiration value, even if it is less than the - /// current expiration or the timer is deregistered. - /// - /// SAFETY: Must only be called with the driver lock held and the entry not - /// in the timer wheel. - pub(super) unsafe fn set_expiration(&self, t: u64) { - self.state.set_expiration(t); - self.registered_when.store(t, Ordering::Relaxed); - } - - /// Sets the true time-of-expiration only if it is after the current. - pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { - self.state.extend_expiration(t) - } - - /// Returns a `TimerHandle` for this timer. - pub(super) fn handle(&self) -> TimerHandle { - TimerHandle { - inner: NonNull::from(self), - } - } - - /// Returns true if the state of this timer indicates that the timer might - /// be registered with the driver. This check is performed with relaxed - /// ordering, but is conservative - if it returns false, the timer is - /// definitely _not_ registered. - pub(super) fn might_be_registered(&self) -> bool { - self.state.might_be_registered() - } -} - -unsafe impl linked_list::Link for TimerShared { - type Handle = TimerHandle; - - type Target = TimerShared; - - fn as_raw(handle: &Self::Handle) -> NonNull { - handle.inner - } - - unsafe fn from_raw(ptr: NonNull) -> Self::Handle { - TimerHandle { inner: ptr } - } - - unsafe fn pointers( - target: NonNull, - ) -> NonNull> { - TimerShared::addr_of_pointers(target) - } -} - -// ===== impl Entry ===== - -impl TimerEntry { - #[track_caller] - pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { - // Panic if the time driver is not enabled - let _ = handle.driver().time(); - - Self { - driver: handle, - inner: None, - deadline, - registered: false, - } - } - - fn inner(&self) -> Option<&TimerShared> { - self.inner.as_ref() - } - - fn init_inner(self: Pin<&mut Self>) { - match self.inner { - Some(_) => {} - None => self.project().inner.set(Some(TimerShared::new())), - } - } - - pub(crate) fn deadline(&self) -> Instant { - self.deadline - } - - pub(crate) fn is_elapsed(&self) -> bool { - let Some(inner) = self.inner() else { - return false; - }; - - // Is this timer still in the timer wheel? - let deregistered = !inner.might_be_registered(); - - // Once the timer has expired, - // it will be taken out of the wheel and be fired. - // - // So if we have already registered the timer into the wheel, - // but now it is not in the wheel, it means that it has been - // fired. - // - // +--------------+-----------------+----------+ - // | deregistered | self.registered | output | - // +--------------+-----------------+----------+ - // | true | false | false | <- never been registered - // +--------------+-----------------+----------+ - // | false | false | false | <- never been registered - // +--------------+-----------------+----------+ - // | true | true | true | <- registered into the wheel, - // | | | | and then taken out of the wheel. - // +--------------+-----------------+----------+ - // | false | true | false | <- still registered in the wheel - // +--------------+-----------------+----------+ - deregistered && self.registered - } - - /// Cancels and deregisters the timer. This operation is irreversible. - pub(crate) fn cancel(self: Pin<&mut Self>) { - // Avoid calling the `clear_entry` method, because it has not been initialized yet. - let Some(inner) = self.inner() else { - return; - }; - - // We need to perform an acq/rel fence with the driver thread, and the - // simplest way to do so is to grab the driver lock. - // - // Why is this necessary? We're about to release this timer's memory for - // some other non-timer use. However, we've been doing a bunch of - // relaxed (or even non-atomic) writes from the driver thread, and we'll - // be doing more from _this thread_ (as this memory is interpreted as - // something else). - // - // It is critical to ensure that, from the point of view of the driver, - // those future non-timer writes happen-after the timer is fully fired, - // and from the purpose of this thread, the driver's writes all - // happen-before we drop the timer. This in turn requires us to perform - // an acquire-release barrier in _both_ directions between the driver - // and dropping thread. - // - // The lock acquisition in clear_entry serves this purpose. All of the - // driver manipulations happen with the lock held, so we can just take - // the lock and be sure that this drop happens-after everything the - // driver did so far and happens-before everything the driver does in - // the future. While we have the lock held, we also go ahead and - // deregister the entry if necessary. - unsafe { self.driver().clear_entry(NonNull::from(inner)) }; - } - - pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { - let this = self.as_mut().project(); - *this.deadline = new_time; - *this.registered = reregister; - - let tick = self.driver().time_source().deadline_to_tick(new_time); - let inner = match self.inner() { - Some(inner) => inner, - None => { - self.as_mut().init_inner(); - self.inner() - .expect("inner should already be initialized by `this.init_inner()`") - } - }; - - if inner.extend_expiration(tick).is_ok() { - return; - } - - if reregister { - unsafe { - self.driver() - .reregister(&self.driver.driver().io, tick, inner.into()); - } - } - } - - pub(crate) fn poll_elapsed( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - assert!( - !self.driver().is_shutdown(), - "{}", - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ); - - if !self.registered { - let deadline = self.deadline; - self.as_mut().reset(deadline, true); - } - - let inner = self - .inner() - .expect("inner should already be initialized by `self.reset()`"); - inner.state.poll(cx.waker()) - } - - pub(crate) fn driver(&self) -> &super::Handle { - self.driver.driver().time() - } - - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) fn clock(&self) -> &super::Clock { - self.driver.driver().clock() - } -} - -impl TimerHandle { - pub(super) unsafe fn registered_when(&self) -> u64 { - unsafe { self.inner.as_ref().registered_when() } - } - - pub(super) unsafe fn sync_when(&self) -> u64 { - unsafe { self.inner.as_ref().sync_when() } - } - - pub(super) unsafe fn is_pending(&self) -> bool { - unsafe { self.inner.as_ref().state.is_pending() } - } - - /// Forcibly sets the true and cached expiration times to the given tick. - /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. - pub(super) unsafe fn set_expiration(&self, tick: u64) { - self.inner.as_ref().set_expiration(tick); - } - - /// Attempts to mark this entry as pending. If the expiration time is after - /// `not_after`, however, returns an Err with the current expiration time. - /// - /// If an `Err` is returned, the `registered_when` value will be updated to this - /// new expiration time. - /// - /// SAFETY: The caller must ensure that the handle remains valid, the driver - /// lock is held, and that the timer is not in any wheel linked lists. - /// After returning Ok, the entry must be added to the pending list. - pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { - match self.inner.as_ref().state.mark_pending(not_after) { - Ok(()) => { - // mark this as being on the pending queue in registered_when - self.inner.as_ref().set_registered_when(STATE_DEREGISTERED); - Ok(()) - } - Err(tick) => { - self.inner.as_ref().set_registered_when(tick); - Err(tick) - } - } - } - - /// Attempts to transition to a terminal state. If the state is already a - /// terminal state, does nothing. - /// - /// Because the entry might be dropped after the state is moved to a - /// terminal state, this function consumes the handle to ensure we don't - /// access the entry afterwards. - /// - /// Returns the last-registered waker, if any. - /// - /// SAFETY: The driver lock must be held while invoking this function, and - /// the entry must not be in any wheel linked lists. - pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option { - self.inner.as_ref().state.fire(completed_state) - } -} diff --git a/tokio/src/runtime/time/handle.rs b/tokio/src/runtime/time/handle.rs index fce791d998c..ef7bc742530 100644 --- a/tokio/src/runtime/time/handle.rs +++ b/tokio/src/runtime/time/handle.rs @@ -1,29 +1,67 @@ -use crate::runtime::time::TimeSource; +use crate::runtime::time::{TimeSource, Wheel}; use std::fmt; +cfg_test_util! { + use crate::loom::sync::Arc; + use crate::loom::sync::atomic::{AtomicBool, Ordering}; +} + /// Handle to time driver instance. pub(crate) struct Handle { pub(super) time_source: TimeSource, - pub(super) inner: super::Inner, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + pub(super) did_wake: Arc, } impl Handle { + pub(crate) fn process_at_time(&self, wheel: &mut Wheel, mut now: u64) { + if now < wheel.elapsed() { + // Time went backwards! This normally shouldn't happen as the Rust language + // guarantees that an Instant is monotonic, but can happen when running + // Linux in a VM on a Windows host due to std incorrectly trusting the + // hardware clock to be monotonic. + // + // See for more information. + now = wheel.elapsed(); + } + + while let Some(hdl) = wheel.poll(now) { + unsafe { + hdl.wake(); + } + } + } + + pub(crate) fn shutdown(&self, wheel: &mut Wheel) { + // self.is_shutdown.store(true, Ordering::SeqCst); + // Advance time forward to the end of time. + // This will ensure that all timers are fired. + let max_tick = u64::MAX; + self.process_at_time(wheel, max_tick); + } + /// Returns the time source associated with this handle. pub(crate) fn time_source(&self) -> &TimeSource { &self.time_source } - /// Checks whether the driver has been shutdown. - pub(super) fn is_shutdown(&self) -> bool { - self.inner.is_shutdown() - } - /// Track that the driver is being unparked pub(crate) fn unpark(&self) { #[cfg(feature = "test-util")] - self.inner - .did_wake - .store(true, std::sync::atomic::Ordering::SeqCst); + self.did_wake.store(true, Ordering::SeqCst); + } + + cfg_test_util! { + pub(crate) fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } } } diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 3250dce97f6..4aef399391c 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -6,9 +6,8 @@ //! Time driver. -mod entry; -pub(crate) use entry::TimerEntry; -use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; +mod timer; +pub(crate) use timer::Timer; mod handle; pub(crate) use self::handle::Handle; @@ -17,16 +16,21 @@ mod source; pub(crate) use source::TimeSource; mod wheel; +cfg_rt_and_time! { + pub(crate) use wheel::{Insert, EntryHandle}; +} +cfg_rt_or_time! { + pub(crate) use wheel::cancellation_queue; + pub(crate) use wheel::Wheel; +} + +cfg_test_util! { + use crate::loom::sync::Arc; +} use crate::loom::sync::atomic::{AtomicBool, Ordering}; -use crate::loom::sync::Mutex; -use crate::runtime::driver::{self, IoHandle, IoStack}; -use crate::time::error::Error; +use crate::runtime::driver::{self, IoStack}; use crate::time::{Clock, Duration}; -use crate::util::WakeList; - -use std::fmt; -use std::{num::NonZeroU64, ptr::NonNull}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// @@ -87,33 +91,8 @@ use std::{num::NonZeroU64, ptr::NonNull}; pub(crate) struct Driver { /// Parker to delegate to. park: IoStack, -} -/// Timer state shared between `Driver`, `Handle`, and `Registration`. -struct Inner { - // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex - state: Mutex, - - /// True if the driver is being shutdown. is_shutdown: AtomicBool, - - // When `true`, a call to `park_timeout` should immediately return and time - // should not advance. One reason for this to be `true` is if the task - // passed to `Runtime::block_on` called `task::yield_now()`. - // - // While it may look racy, it only has any effect when the clock is paused - // and pausing the clock is restricted to a single-threaded runtime. - #[cfg(feature = "test-util")] - did_wake: AtomicBool, -} - -/// Time state shared which must be protected by a `Mutex` -struct InnerState { - /// The earliest time at which we promise to wake up without unparking. - next_wake: Option, - - /// Timer wheel. - wheel: wheel::Wheel, } // ===== impl Driver ===== @@ -128,280 +107,49 @@ impl Driver { let handle = Handle { time_source, - inner: Inner { - state: Mutex::new(InnerState { - next_wake: None, - wheel: wheel::Wheel::new(), - }), - is_shutdown: AtomicBool::new(false), - - #[cfg(feature = "test-util")] - did_wake: AtomicBool::new(false), - }, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), }; - let driver = Driver { park }; + let driver = Driver { + park, + is_shutdown: AtomicBool::new(false), + }; (driver, handle) } pub(crate) fn park(&mut self, handle: &driver::Handle) { - self.park_internal(handle, None); + self.park.park(handle); } pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { - self.park_internal(handle, Some(duration)); + self.park.park_timeout(handle, duration); } pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { - let handle = rt_handle.time(); - - if handle.is_shutdown() { + if self.is_shutdown.load(Ordering::SeqCst) { return; } - handle.inner.is_shutdown.store(true, Ordering::SeqCst); - - // Advance time forward to the end of time. - - handle.process_at_time(u64::MAX); - + self.is_shutdown.store(true, Ordering::SeqCst); self.park.shutdown(rt_handle); } - - fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { - let handle = rt_handle.time(); - let mut lock = handle.inner.lock(); - - assert!(!handle.is_shutdown()); - - let next_wake = lock.wheel.next_expiration_time(); - lock.next_wake = - next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - - drop(lock); - - match next_wake { - Some(when) => { - let now = handle.time_source.now(rt_handle.clock()); - // Note that we effectively round up to 1ms here - this avoids - // very short-duration microsecond-resolution sleeps that the OS - // might treat as zero-length. - let mut duration = handle - .time_source - .tick_to_duration(when.saturating_sub(now)); - - if duration > Duration::from_millis(0) { - if let Some(limit) = limit { - duration = std::cmp::min(limit, duration); - } - - self.park_thread_timeout(rt_handle, duration); - } else { - self.park.park_timeout(rt_handle, Duration::from_secs(0)); - } - } - None => { - if let Some(duration) = limit { - self.park_thread_timeout(rt_handle, duration); - } else { - self.park.park(rt_handle); - } - } - } - - // Process pending timers after waking up - handle.process(rt_handle.clock()); - } - - cfg_test_util! { - fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { - let handle = rt_handle.time(); - let clock = rt_handle.clock(); - - if clock.can_auto_advance() { - self.park.park_timeout(rt_handle, Duration::from_secs(0)); - - // If the time driver was woken, then the park completed - // before the "duration" elapsed (usually caused by a - // yield in `Runtime::block_on`). In this case, we don't - // advance the clock. - if !handle.did_wake() { - // Simulate advancing time - if let Err(msg) = clock.advance(duration) { - panic!("{}", msg); - } - } - } else { - self.park.park_timeout(rt_handle, duration); - } - } - } - - cfg_not_test_util! { - fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { - self.park.park_timeout(rt_handle, duration); - } - } -} - -impl Handle { - pub(self) fn process(&self, clock: &Clock) { - let now = self.time_source().now(clock); - - self.process_at_time(now); - } - - pub(self) fn process_at_time(&self, mut now: u64) { - let mut waker_list = WakeList::new(); - - let mut lock = self.inner.lock(); - - if now < lock.wheel.elapsed() { - // Time went backwards! This normally shouldn't happen as the Rust language - // guarantees that an Instant is monotonic, but can happen when running - // Linux in a VM on a Windows host due to std incorrectly trusting the - // hardware clock to be monotonic. - // - // See for more information. - now = lock.wheel.elapsed(); - } - - while let Some(entry) = lock.wheel.poll(now) { - debug_assert!(unsafe { entry.is_pending() }); - - // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. - if let Some(waker) = unsafe { entry.fire(Ok(())) } { - waker_list.push(waker); - - if !waker_list.can_push() { - // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. - drop(lock); - - waker_list.wake_all(); - - lock = self.inner.lock(); - } - } - } - - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - - drop(lock); - - waker_list.wake_all(); - } - - /// Removes a registered timer from the driver. - /// - /// The timer will be moved to the cancelled state. Wakers will _not_ be - /// invoked. If the timer is already completed, this function is a no-op. - /// - /// This function always acquires the driver lock, even if the entry does - /// not appear to be registered. - /// - /// SAFETY: The timer must not be registered with some other driver, and - /// `add_entry` must not be called concurrently. - pub(self) unsafe fn clear_entry(&self, entry: NonNull) { - unsafe { - let mut lock = self.inner.lock(); - - if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); - } - - entry.as_ref().handle().fire(Ok(())); - } - } - - /// Removes and re-adds an entry to the driver. - /// - /// SAFETY: The timer must be either unregistered, or registered with this - /// driver. No other threads are allowed to concurrently manipulate the - /// timer at all (the current thread should hold an exclusive reference to - /// the `TimerEntry`) - pub(self) unsafe fn reregister( - &self, - unpark: &IoHandle, - new_tick: u64, - entry: NonNull, - ) { - let waker = unsafe { - let mut lock = self.inner.lock(); - - // We may have raced with a firing/deregistration, so check before - // deregistering. - if unsafe { entry.as_ref().might_be_registered() } { - lock.wheel.remove(entry); - } - - // Now that we have exclusive control of this entry, mint a handle to reinsert it. - let entry = entry.as_ref().handle(); - - if self.is_shutdown() { - unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } - } else { - entry.set_expiration(new_tick); - - // Note: We don't have to worry about racing with some other resetting - // thread, because add_entry and reregister require exclusive control of - // the timer entry. - match unsafe { lock.wheel.insert(entry) } { - Ok(when) => { - if lock - .next_wake - .map(|next_wake| when < next_wake.get()) - .unwrap_or(true) - { - unpark.unpark(); - } - - None - } - Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe { - entry.fire(Ok(())) - }, - } - } - - // Must release lock before invoking waker to avoid the risk of deadlock. - }; - - // The timer was fired synchronously as a result of the reregistration. - // Wake the waker; this is needed because we might reset _after_ a poll, - // and otherwise the task won't be awoken to poll again. - if let Some(waker) = waker { - waker.wake(); - } - } - - cfg_test_util! { - fn did_wake(&self) -> bool { - self.inner.did_wake.swap(false, Ordering::SeqCst) - } - } -} - -// ===== impl Inner ===== - -impl Inner { - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { - self.state.lock() - } - - // Check whether the driver has been shutdown - pub(super) fn is_shutdown(&self) -> bool { - self.is_shutdown.load(Ordering::SeqCst) - } } -impl fmt::Debug for Inner { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Inner").finish() +cfg_rt_or_time! { + /// Local context for the time driver. + pub(crate) enum Context<'a> { + /// The runtime is running, we can access it. + Running { + /// the local time wheel + wheel: &'a mut Wheel, + /// channel to push timers that are pending cancellation + canc_tx: &'a cancellation_queue::Sender, + }, + #[cfg(feature = "rt-multi-thread")] + /// The runtime is shutting down, no timers can be registered. + Shutdown, } } diff --git a/tokio/src/runtime/time/source.rs b/tokio/src/runtime/time/source.rs index e3ba8d790c0..8f53c63d2d9 100644 --- a/tokio/src/runtime/time/source.rs +++ b/tokio/src/runtime/time/source.rs @@ -1,4 +1,3 @@ -use super::MAX_SAFE_MILLIS_DURATION; use crate::time::{Clock, Duration, Instant}; /// A structure which handles conversion from Instants to `u64` timestamps. @@ -22,11 +21,7 @@ impl TimeSource { pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 { // round up let dur: Duration = t.saturating_duration_since(self.start_time); - let ms = dur - .as_millis() - .try_into() - .unwrap_or(MAX_SAFE_MILLIS_DURATION); - ms.min(MAX_SAFE_MILLIS_DURATION) + dur.as_millis().try_into().unwrap_or(u64::MAX) } pub(crate) fn tick_to_duration(&self, t: u64) -> Duration { diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 33c4a5366d1..9d851faca60 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -1,15 +1,18 @@ #![cfg(not(target_os = "wasi"))] +use std::future::poll_fn; use std::{task::Context, time::Duration}; #[cfg(not(loom))] use futures::task::noop_waker_ref; -use crate::loom::sync::atomic::{AtomicBool, Ordering}; -use crate::loom::sync::Arc; use crate::loom::thread; +use crate::runtime::time::timer::with_current_wheel; +use crate::runtime::time::Context as TimeContext; +use crate::runtime::Handle; +use crate::sync::oneshot; -use super::TimerEntry; +use super::Timer; fn block_on(f: impl std::future::Future) -> T { #[cfg(loom)] @@ -21,6 +24,7 @@ fn block_on(f: impl std::future::Future) -> T { .build() .unwrap(); rt.block_on(f) + // futures::executor::block_on(f) } } @@ -32,9 +36,50 @@ fn model(f: impl Fn() + Send + Sync + 'static) { f(); } +async fn fire_all_timers(handle: &Handle, exit_rx: oneshot::Receiver<()>) { + loop { + // Keep the worker thread busy, so that it can process injected + // timers. + crate::task::yield_now().await; + if !exit_rx.is_empty() { + // break the loop if the thread is exiting + break; + } + + // In the `block_on` context, we can get the current wheel + // fire all timers. + with_current_wheel(&handle.inner, |maybe_wheel| match maybe_wheel { + Some(TimeContext::Running { wheel, .. }) => { + let time = handle.inner.driver().time(); + time.process_at_time(wheel, u64::MAX); + } + #[cfg(feature = "rt-multi-thread")] + Some(TimeContext::Shutdown) => panic!("runtime is shutting down"), + None => panic!("no current wheel"), + }); + + thread::yield_now(); + } +} + +// This function must be called inside the `rt.block_on`. +fn process_at_time(handle: &Handle, at: u64) { + let handle = &handle.inner; + with_current_wheel(handle, |maybe_wheel| match maybe_wheel { + Some(TimeContext::Running { wheel, .. }) => { + let time = handle.driver().time(); + time.process_at_time(wheel, at); + } + #[cfg(feature = "rt-multi-thread")] + Some(TimeContext::Shutdown) => panic!("runtime is shutting down"), + None => panic!("no current wheel"), + }); +} + fn rt(start_paused: bool) -> crate::runtime::Runtime { crate::runtime::Builder::new_current_thread() .enable_time() + .event_interval(1) .start_paused(start_paused) .build() .unwrap() @@ -45,25 +90,23 @@ fn single_timer() { model(|| { let rt = rt(false); let handle = rt.handle(); + let (exit_tx, exit_rx) = oneshot::channel(); let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( + let entry = Timer::new( handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); + block_on(poll_fn(|cx| entry.as_mut().poll_elapsed(cx))); + exit_tx.send(()).unwrap(); }); - thread::yield_now(); - - let time = handle.inner.driver().time(); - let clock = handle.inner.driver().clock(); - - // advance 2s - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + rt.block_on(async move { + fire_all_timers(handle, exit_rx).await; + }); jh.join().unwrap(); }) @@ -74,10 +117,11 @@ fn drop_timer() { model(|| { let rt = rt(false); let handle = rt.handle(); + let (exit_tx, exit_rx) = oneshot::channel(); let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( + let entry = Timer::new( handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); @@ -89,15 +133,12 @@ fn drop_timer() { let _ = entry .as_mut() .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); + exit_tx.send(()).unwrap(); }); - thread::yield_now(); - - let time = handle.inner.driver().time(); - let clock = handle.inner.driver().clock(); - - // advance 2s in the future. - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + rt.block_on(async move { + fire_all_timers(handle, exit_rx).await; + }); jh.join().unwrap(); }) @@ -108,10 +149,12 @@ fn change_waker() { model(|| { let rt = rt(false); let handle = rt.handle(); + let (exit_tx, exit_rx) = oneshot::channel(); + let (change_waker_tx, change_waker_rx) = oneshot::channel(); let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = TimerEntry::new( + let entry = Timer::new( handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); @@ -121,70 +164,33 @@ fn change_waker() { .as_mut() .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); + // At this point, we cannot let worker thread to wake up + // the timer because the waker is a noop. + // Let's say the timer has been woken up at this point, + // the following poll is basically polling a future that has completed + // (already returned `Ready`),which is not encouraged. + + let mut maybe_change_waker_tx = Some(change_waker_tx); + block_on(poll_fn(|cx| { + let p = entry.as_mut().poll_elapsed(cx); + if let Some(tx) = maybe_change_waker_tx.take() { + // notify the worker thread that the waker is useable now + tx.send(()).unwrap(); + } + p + })); + + // notify the worker thread to exit + exit_tx.send(()).unwrap(); }); - thread::yield_now(); - - let time = handle.inner.driver().time(); - let clock = handle.inner.driver().clock(); - - // advance 2s - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); - - jh.join().unwrap(); - }) -} - -#[test] -fn reset_future() { - model(|| { - let finished_early = Arc::new(AtomicBool::new(false)); + change_waker_rx.blocking_recv().unwrap(); - let rt = rt(false); - let handle = rt.handle(); - - let handle_ = handle.clone(); - let finished_early_ = finished_early.clone(); - let start = handle.inner.driver().clock().now(); - - let jh = thread::spawn(move || { - let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1)); - pin!(entry); - - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); - - entry.as_mut().reset(start + Duration::from_secs(2), true); - - // shouldn't complete before 2s - block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap(); - - finished_early_.store(true, Ordering::Relaxed); + rt.block_on(async move { + fire_all_timers(handle, exit_rx).await; }); - thread::yield_now(); - - let handle = handle.inner.driver().time(); - - handle.process_at_time( - handle - .time_source() - .instant_to_tick(start + Duration::from_millis(1500)), - ); - - assert!(!finished_early.load(Ordering::Relaxed)); - - handle.process_at_time( - handle - .time_source() - .instant_to_tick(start + Duration::from_millis(2500)), - ); - jh.join().unwrap(); - - assert!(finished_early.load(Ordering::Relaxed)); }) } @@ -205,31 +211,33 @@ fn poll_process_levels() { let mut entries = vec![]; - for i in 0..normal_or_miri(1024, 64) { - let mut entry = Box::pin(TimerEntry::new( - handle.inner.clone(), - handle.inner.driver().clock().now() + Duration::from_millis(i), - )); + rt.block_on(async { + for i in 0..normal_or_miri(1024, 64) { + let mut entry = Box::pin(Timer::new( + handle.inner.clone(), + handle.inner.driver().clock().now() + Duration::from_millis(i), + )); - let _ = entry - .as_mut() - .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); + let _ = entry + .as_mut() + .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); - entries.push(entry); - } + entries.push(entry); + } - for t in 1..normal_or_miri(1024, 64) { - handle.inner.driver().time().process_at_time(t as u64); + for t in 1..normal_or_miri(1024, 64) { + process_at_time(handle, t); - for (deadline, future) in entries.iter_mut().enumerate() { - let mut context = Context::from_waker(noop_waker_ref()); - if deadline <= t { - assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); - } else { - assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); + for (deadline, future) in entries.iter_mut().enumerate() { + let mut context = Context::from_waker(noop_waker_ref()); + if deadline <= t as usize { + assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); + } else { + assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); + } } } - } + }); } #[test] @@ -240,30 +248,16 @@ fn poll_process_levels_targeted() { let rt = rt(true); let handle = rt.handle(); - let e1 = TimerEntry::new( - handle.inner.clone(), - handle.inner.driver().clock().now() + Duration::from_millis(193), - ); - pin!(e1); - - let handle = handle.inner.driver().time(); - - handle.process_at_time(62); - assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); - handle.process_at_time(192); - handle.process_at_time(192); -} - -#[test] -#[cfg(not(loom))] -fn instant_to_tick_max() { - use crate::runtime::time::entry::MAX_SAFE_MILLIS_DURATION; - - let rt = rt(true); - let handle = rt.handle().inner.driver().time(); - - let start_time = handle.time_source.start_time(); - let long_future = start_time + std::time::Duration::from_millis(MAX_SAFE_MILLIS_DURATION + 1); + rt.block_on(async { + let e1 = Timer::new( + handle.inner.clone(), + handle.inner.driver().clock().now() + Duration::from_millis(193), + ); + pin!(e1); - assert!(handle.time_source.instant_to_tick(long_future) <= MAX_SAFE_MILLIS_DURATION); + process_at_time(handle, 62); + assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); + process_at_time(handle, 192); + process_at_time(handle, 192); + }) } diff --git a/tokio/src/runtime/time/timer.rs b/tokio/src/runtime/time/timer.rs new file mode 100644 index 00000000000..6fd5114d371 --- /dev/null +++ b/tokio/src/runtime/time/timer.rs @@ -0,0 +1,188 @@ +use super::wheel::EntryHandle; +use crate::runtime::scheduler::Handle as SchedulerHandle; +use crate::runtime::time::wheel::Insert; +use crate::runtime::time::Context as TimeContext; +use crate::time::Instant; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[cfg(any(feature = "rt", feature = "rt-multi-thread"))] +use crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR; + +pub(crate) struct Timer { + sched_handle: SchedulerHandle, + + /// The entry in the timing wheel. + /// + /// This is `None` if the timer has been deregistered. + entry: Option, + + /// The deadline for the timer. + deadline: Instant, +} + +impl std::fmt::Debug for Timer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Timer") + .field("deadline", &self.deadline) + .finish() + } +} + +impl Drop for Timer { + fn drop(&mut self) { + if let Some(entry) = self.entry.take() { + entry.transition_to_cancelling(); + } + } +} + +impl Timer { + #[track_caller] + pub(crate) fn new(sched_hdl: SchedulerHandle, deadline: Instant) -> Self { + // Panic if the time driver is not enabled + let _ = sched_hdl.driver().time(); + Timer { + sched_handle: sched_hdl, + entry: None, + deadline, + } + } + + pub(crate) fn deadline(&self) -> Instant { + self.deadline + } + + pub(crate) fn is_elapsed(&self) -> bool { + self.entry.as_ref().is_some_and(|entry| entry.is_woken_up()) + } + + fn register(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let this = self.get_mut(); + + with_current_wheel(&this.sched_handle, |maybe_time_cx| { + let deadline = deadline_to_tick(&this.sched_handle, this.deadline); + let hdl = EntryHandle::new(deadline, cx.waker()); + + match maybe_time_cx { + Some(TimeContext::Running { wheel, canc_tx }) => { + // Safety: the entry is not registered yet + match unsafe { wheel.insert(hdl.clone(), canc_tx.clone()) } { + Insert::Success => { + this.entry = Some(hdl); + Poll::Pending + } + Insert::Elapsed => Poll::Ready(()), + Insert::Cancelling => Poll::Pending, + } + } + #[cfg(feature = "rt-multi-thread")] + Some(TimeContext::Shutdown) => panic!("{RUNTIME_SHUTTING_DOWN_ERROR}"), + None => { + this.entry = Some(hdl.clone()); + push_from_remote(&this.sched_handle, hdl); + Poll::Pending + } + } + }) + } + + pub(crate) fn poll_elapsed(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + match self.entry.as_ref() { + Some(entry) if entry.is_woken_up() => Poll::Ready(()), + Some(entry) => { + entry.register_waker(cx.waker()); + Poll::Pending + } + None => self.register(cx), + } + } +} + +pub(super) fn with_current_wheel(hdl: &SchedulerHandle, f: F) -> R +where + F: FnOnce(Option>) -> R, +{ + #[cfg(not(feature = "rt"))] + { + let (_, _) = (hdl, f); + panic!("Tokio runtime is not enabled, cannot access the current wheel"); + } + + #[cfg(feature = "rt")] + { + use crate::loom::sync::Arc; + use crate::runtime::context; + use crate::runtime::scheduler::Context; + use crate::runtime::scheduler::Handle::CurrentThread; + #[cfg(feature = "rt-multi-thread")] + use crate::runtime::scheduler::Handle::MultiThread; + + let is_same_rt = context::with_current(|cur_hdl| match (cur_hdl, hdl) { + (CurrentThread(cur_hdl), CurrentThread(hdl)) => Arc::ptr_eq(cur_hdl, hdl), + #[cfg(feature = "rt-multi-thread")] + (MultiThread(cur_hdl), MultiThread(hdl)) => Arc::ptr_eq(cur_hdl, hdl), + #[cfg(feature = "rt-multi-thread")] + // this above cfg is needed to avoid the compiler warning reported by: + // cargo check -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json \ + // --manifest-path tokio/Cargo.toml --no-default-features \ + // --features test-util` + // error: unreachable pattern + // --> tokio/src/runtime/time/timer.rs:118:13 + // | + // 115 | (CurrentThread(cur_hdl), CurrentThread(hdl)) => Arc::ptr_eq(cur_hdl, hdl), + // | -------------------------------------------- matches all the relevant values + // ... + // 118 | _ => false, + // | ^ no value can reach this + _ => false, + }) + .unwrap_or_default(); + + if !is_same_rt { + // We don't want to create the timer in one runtime, + // but register it in a different runtime's timer wheel. + f(None) + } else { + context::with_scheduler(|maybe_cx| match maybe_cx { + Some(Context::CurrentThread(cx)) => cx.with_wheel(f), + #[cfg(feature = "rt-multi-thread")] + Some(Context::MultiThread(cx)) => cx.with_wheel(f), + None => f(None), + }) + } + } +} + +fn push_from_remote(sched_hdl: &SchedulerHandle, entry_hdl: EntryHandle) { + #[cfg(not(feature = "rt"))] + { + let (_, _) = (sched_hdl, entry_hdl); + panic!("Tokio runtime is not enabled, cannot access the current wheel"); + } + + #[cfg(feature = "rt")] + { + use crate::runtime::scheduler::Handle::CurrentThread; + #[cfg(feature = "rt-multi-thread")] + use crate::runtime::scheduler::Handle::MultiThread; + + match sched_hdl { + CurrentThread(hdl) => { + assert!(!hdl.is_shutdown(), "{RUNTIME_SHUTTING_DOWN_ERROR}"); + hdl.push_remote_timer(entry_hdl) + } + #[cfg(feature = "rt-multi-thread")] + MultiThread(hdl) => { + assert!(!hdl.is_shutdown(), "{RUNTIME_SHUTTING_DOWN_ERROR}"); + hdl.push_remote_timer(entry_hdl) + } + } + } +} + +fn deadline_to_tick(sched_hdl: &SchedulerHandle, deadline: Instant) -> u64 { + let time_hdl = sched_hdl.driver().time(); + time_hdl.time_source().deadline_to_tick(deadline) +} diff --git a/tokio/src/runtime/time/wheel/cancellation_queue.rs b/tokio/src/runtime/time/wheel/cancellation_queue.rs new file mode 100644 index 00000000000..9fc08292c8c --- /dev/null +++ b/tokio/src/runtime/time/wheel/cancellation_queue.rs @@ -0,0 +1,113 @@ +use super::{Entry, EntryHandle}; +use crate::loom::sync::{Arc, Mutex}; +use crate::runtime::time::wheel::CancellationQueueEntry; +use crate::util::linked_list; + +type EntryList = linked_list::LinkedList; + +#[derive(Debug)] +struct Inner { + list: EntryList, +} + +/// Safety: [`Inner`] is protected by [`Mutex`]. +unsafe impl Send for Inner {} + +/// Safety: [`Inner`] is protected by [`Mutex`]. +unsafe impl Sync for Inner {} + +impl Drop for Inner { + fn drop(&mut self) { + // consume all entries + let _ = self.iter().count(); + } +} + +impl Inner { + fn new() -> Self { + Self { + list: EntryList::new(), + } + } + + /// # Safety + /// + /// Behavior is undefined if any of the following conditions are violated: + /// + /// - `hdl` must not in any cancellation queue. + unsafe fn push_front(&mut self, hdl: EntryHandle) { + self.list.push_front(hdl); + } + + fn iter(&mut self) -> impl Iterator { + struct Iter { + list: EntryList, + } + + impl Drop for Iter { + fn drop(&mut self) { + while let Some(hdl) = self.list.pop_front() { + drop(hdl); + } + } + } + + impl Iterator for Iter { + type Item = EntryHandle; + + fn next(&mut self) -> Option { + self.list.pop_front() + } + } + + Iter { + list: std::mem::take(&mut self.list), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct Sender { + inner: Arc>, +} + +/// Safety: [`Inner`] is protected by [`Mutex`]. +unsafe impl Send for Sender {} + +/// Safety: [`Inner`] is protected by [`Mutex`]. +unsafe impl Sync for Sender {} + +impl Sender { + /// # Safety + /// + /// Behavior is undefined if any of the following conditions are violated: + /// + /// - `hdl` must not in any cancellation queue. + pub(crate) unsafe fn send(&self, hdl: EntryHandle) { + self.inner.lock().push_front(hdl); + } +} + +#[derive(Debug)] +pub(crate) struct Receiver { + inner: Arc>, +} + +impl Receiver { + pub(crate) fn recv_all(&mut self) -> impl Iterator { + self.inner.lock().iter() + } +} + +pub(crate) fn new() -> (Sender, Receiver) { + let inner = Arc::new(Mutex::new(Inner::new())); + ( + Sender { + inner: inner.clone(), + }, + Receiver { inner }, + ) +} + +#[cfg(test)] +mod tests; diff --git a/tokio/src/runtime/time/wheel/cancellation_queue/tests.rs b/tokio/src/runtime/time/wheel/cancellation_queue/tests.rs new file mode 100644 index 00000000000..17b426e23de --- /dev/null +++ b/tokio/src/runtime/time/wheel/cancellation_queue/tests.rs @@ -0,0 +1,99 @@ +use super::*; + +use futures::task::noop_waker; + +#[cfg(loom)] +const NUM_ITEMS: usize = 16; + +#[cfg(not(loom))] +const NUM_ITEMS: usize = 64; + +fn new_handle() -> EntryHandle { + EntryHandle::new(0, &noop_waker()) +} + +fn model(f: F) { + #[cfg(loom)] + loom::model(f); + + #[cfg(not(loom))] + f(); +} + +#[test] +fn single_thread() { + model(|| { + for i in 0..NUM_ITEMS { + let (tx, mut rx) = new(); + + for _ in 0..i { + unsafe { tx.send(new_handle()) }; + } + + assert_eq!(rx.recv_all().count(), i); + } + }); +} + +#[test] +#[cfg(not(target_os = "wasi"))] // No thread on wasi. +fn multi_thread() { + use crate::loom::sync::atomic::{AtomicUsize, Ordering::SeqCst}; + use crate::loom::sync::Arc; + use crate::loom::thread; + + #[cfg(loom)] + const NUM_THREADS: usize = 3; + #[cfg(not(loom))] + const NUM_THREADS: usize = 8; + + model(|| { + let (tx, mut rx) = new(); + let mut jhs = Vec::new(); + let sent = Arc::new(AtomicUsize::new(0)); + + for _ in 0..NUM_THREADS { + let tx = tx.clone(); + let sent = sent.clone(); + jhs.push(thread::spawn(move || { + for _ in 0..NUM_ITEMS { + unsafe { tx.send(new_handle()) }; + sent.fetch_add(1, SeqCst); + } + })); + } + + let mut count = 0; + loop { + count += rx.recv_all().count(); + if sent.fetch_add(0, SeqCst) == NUM_ITEMS * NUM_THREADS { + jhs.into_iter().for_each(|jh| { + jh.join().unwrap(); + }); + count += rx.recv_all().count(); + break; + } + thread::yield_now(); + } + + assert_eq!(count, NUM_ITEMS * NUM_THREADS); + }) +} + +#[test] +fn drop_iter_should_not_leak_memory() { + model(|| { + let (tx, mut rx) = new(); + + let hdls = (0..NUM_ITEMS).map(|_| new_handle()).collect::>(); + for hdl in hdls.iter() { + unsafe { tx.send(hdl.clone()) }; + } + + drop(rx.recv_all()); + + for hdl in hdls { + assert_eq!(hdl.inner_strong_count(), 1); + } + }); +} diff --git a/tokio/src/runtime/time/wheel/entry.rs b/tokio/src/runtime/time/wheel/entry.rs new file mode 100644 index 00000000000..aa48b2e86d3 --- /dev/null +++ b/tokio/src/runtime/time/wheel/entry.rs @@ -0,0 +1,285 @@ +use super::cancellation_queue::Sender; +use crate::loom::sync::{Arc, Mutex}; +use crate::{sync::AtomicWaker, util::linked_list}; + +use std::marker::PhantomPinned; +use std::ptr::NonNull; +use std::task::Waker; + +pub(crate) type EntryList = linked_list::LinkedList; + +#[derive(Debug)] +enum State { + /// A pure new entry, no any changes to the state. + Unregistered, + + /// The entry is registered to the timer wheel, + /// but not in the pending queue of the timer wheel. + Registered(Sender), + + /// The entry is in the pending queue of the timer wheel, + /// and not in any wheel level, which means that + /// the entry is reached its deadline and waiting to be woken up. + Pending, + + /// The waker has been called, and the entry is no longer in the timer wheel + /// (both each wheel level and the pending queue), which means that + /// the entry is reached its deadline and woken up. + WokenUp, + + /// The [`Handle`] has been sent to the [`Sender`]. + Cancelling, +} + +#[derive(Debug)] +pub(crate) struct Entry { + /// The intrusive pointers used by timer wheel. + wheel_pointers: linked_list::Pointers, + + /// The intrusive pointer used by cancellation queue. + cancel_pointers: linked_list::Pointers, + + /// The tick when this entry is scheduled to expire. + deadline: u64, + + /// The currently registered waker. + waker: AtomicWaker, + + state: Mutex, + + /// Make the type `!Unpin` to prevent LLVM from emitting + /// the `noalias` attribute for mutable references. + /// + /// See . + _pin: PhantomPinned, +} + +// Safety: `Entry` is always in an `Arc`. +unsafe impl linked_list::Link for Entry { + type Handle = Handle; + type Target = Entry; + + fn as_raw(hdl: &Self::Handle) -> NonNull { + unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) } + } + + unsafe fn from_raw(ptr: NonNull) -> Self::Handle { + Handle { + entry: Arc::from_raw(ptr.as_ptr()), + } + } + + unsafe fn pointers( + target: NonNull, + ) -> NonNull> { + let this = target.as_ptr(); + let field = std::ptr::addr_of_mut!((*this).wheel_pointers); + NonNull::new_unchecked(field) + } +} + +/// An ZST to allow [`super::cancellation_queue`] to utilize the [`Entry::cancel_pointers`] +/// by impl [`linked_list::Link`] as we cannot impl it on [`Entry`] +/// directly due to the conflicting implementations used by [`Entry::wheel_pointers`]. +/// +/// This type should never be constructed. +pub(super) struct CancellationQueueEntry; + +// Safety: `Entry` is always in an `Arc`. +unsafe impl linked_list::Link for CancellationQueueEntry { + type Handle = Handle; + type Target = Entry; + + fn as_raw(hdl: &Self::Handle) -> NonNull { + unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) } + } + + unsafe fn from_raw(ptr: NonNull) -> Self::Handle { + Handle { + entry: Arc::from_raw(ptr.as_ptr()), + } + } + + unsafe fn pointers( + target: NonNull, + ) -> NonNull> { + let this = target.as_ptr(); + let field = std::ptr::addr_of_mut!((*this).cancel_pointers); + NonNull::new_unchecked(field) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct Handle { + pub(crate) entry: Arc, +} + +impl From for NonNull { + fn from(handle: Handle) -> NonNull { + let ptr = Arc::as_ptr(&handle.entry); + unsafe { NonNull::new_unchecked(ptr.cast_mut()) } + } +} + +impl Handle { + pub(crate) fn new(deadline: u64, waker: &Waker) -> Self { + let entry = Arc::new(Entry { + wheel_pointers: linked_list::Pointers::new(), + cancel_pointers: linked_list::Pointers::new(), + deadline, + waker: AtomicWaker::new(), + state: Mutex::new(State::Unregistered), + _pin: PhantomPinned, + }); + entry.waker.register_by_ref(waker); + + Handle { entry } + } + + /// Wake the entry if it is already in the pending queue of the timer wheel. + pub(crate) fn wake(&self) { + let mut lock = self.entry.state.lock(); + match &*lock { + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + state @ (State::Unregistered | State::Registered(_)) => { + panic!("corrupted state: {state:#?}") + } + State::Pending => { + *lock = State::WokenUp; + // Since state has been updated, no need to hold the lock. + drop(lock); + self.entry.waker.wake(); + } + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + State::WokenUp => panic!("corrupted state: `State::WokenUp`"), + // no need to wake up cancelling entry + State::Cancelling => (), + } + } + + /// Wake the entry if it has already elapsed before registering to the timer wheel. + pub(crate) fn wake_unregistered(&self) { + let mut lock = self.entry.state.lock(); + match &*lock { + State::Unregistered => { + *lock = State::WokenUp; + // Since state has been updated, no need to hold the lock. + drop(lock); + self.entry.waker.wake(); + } + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + state @ (State::Registered(_) | State::WokenUp) => { + panic!("corrupted state: {state:#?}") + } + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + State::Pending => panic!("corrupted state: State::Pending"), + // don't wake up cancelling entries + State::Cancelling => (), + } + } + + pub(crate) fn register_waker(&self, waker: &Waker) { + self.entry.waker.register_by_ref(waker); + } + + pub(crate) fn transition_to_registered(&self, cancel_tx: Sender) -> TransitionToRegistered { + let mut lock = self.entry.state.lock(); + match &*lock { + State::Unregistered => { + *lock = State::Registered(cancel_tx); + TransitionToRegistered::Success + } + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + state @ (State::Registered(_) | State::Pending | State::WokenUp) => { + panic!("corrupted state: {state:#?}") + } + State::Cancelling => TransitionToRegistered::Cancelling, + } + } + + pub(crate) fn transition_to_pending(&self, not_after: u64) -> TransitionToPending { + if self.entry.deadline > not_after { + return TransitionToPending::NotElapsed(self.entry.deadline); + } + + let mut lock = self.entry.state.lock(); + match &*lock { + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + State::Unregistered => panic!("corrupted state: State::Unregistered"), + State::Registered(_) => { + *lock = State::Pending; + TransitionToPending::Success + } + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + state @ (State::Pending | State::WokenUp) => panic!("corrupted state: {state:#?}"), + State::Cancelling => TransitionToPending::Cancelling, + } + } + + pub(crate) fn transition_to_cancelling(&self) { + let mut lock = self.entry.state.lock(); + + match *lock { + State::Unregistered => *lock = State::Cancelling, + State::Registered(ref tx) => { + // Safety: entry is not in any cancellation queue + unsafe { + tx.send(self.clone()); + } + *lock = State::Cancelling; + } + // no need to cancel a pending or woken up entry + State::Pending | State::WokenUp => *lock = State::Cancelling, + // don't unlock — poisoning the `Mutex` stops others from using the bad state. + State::Cancelling => panic!("should not be called twice"), + } + } + + pub(crate) fn deadline(&self) -> u64 { + self.entry.deadline + } + + pub(crate) fn is_registered(&self) -> bool { + matches!(*self.entry.state.lock(), State::Registered(_)) + } + + pub(crate) fn is_pending(&self) -> bool { + matches!(*self.entry.state.lock(), State::Pending) + } + + pub(crate) fn is_woken_up(&self) -> bool { + matches!(*self.entry.state.lock(), State::WokenUp) + } + + #[cfg(test)] + /// Only used for unit tests. + pub(crate) fn inner_strong_count(&self) -> usize { + Arc::strong_count(&self.entry) + } +} + +/// An error returned when trying to transition +/// an being cancelled entry to the registered state. +pub(crate) enum TransitionToRegistered { + /// The entry is being cancelled, no need to register it. + Success, + + /// The entry is being cancelled, + /// no need to transition it to the registered state. + Cancelling, +} + +/// An result of the `transition_to_pending` method. +pub(crate) enum TransitionToPending { + /// The entry was successfully transitioned + /// to the pending state. + Success, + + /// The entry doesn't reached its deadline yet, + /// and the tick when it should be woken up is returned. + NotElapsed(u64), + + /// The entry is being cancelled, + /// no need to transition it to the pending state. + Cancelling, +} diff --git a/tokio/src/runtime/time/wheel/level.rs b/tokio/src/runtime/time/wheel/level.rs index 754e638bf57..cea17a6fb8b 100644 --- a/tokio/src/runtime/time/wheel/level.rs +++ b/tokio/src/runtime/time/wheel/level.rs @@ -1,6 +1,5 @@ -use crate::runtime::time::{EntryList, TimerHandle, TimerShared}; - -use std::{array, fmt, ptr::NonNull}; +use super::{EntryHandle, EntryList}; +use std::{array, fmt}; /// Wheel for a single level in the timer. This wheel contains 64 slots. pub(crate) struct Level { @@ -119,18 +118,20 @@ impl Level { Some(slot) } - pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) { - let slot = slot_for(item.registered_when(), self.level); + pub(crate) unsafe fn add_entry(&mut self, hdl: EntryHandle) { + // Safety: the associated entry must be valid. + let deadline = hdl.deadline(); + let slot = slot_for(deadline, self.level); - self.slot[slot].push_front(item); + self.slot[slot].push_front(hdl); self.occupied |= occupied_bit(slot); } - pub(crate) unsafe fn remove_entry(&mut self, item: NonNull) { - let slot = slot_for(unsafe { item.as_ref().registered_when() }, self.level); + pub(crate) unsafe fn remove_entry(&mut self, hdl: EntryHandle) { + let slot = slot_for(hdl.deadline(), self.level); - unsafe { self.slot[slot].remove(item) }; + unsafe { self.slot[slot].remove(hdl.into()) }; if self.slot[slot].is_empty() { // The bit is currently set debug_assert!(self.occupied & occupied_bit(slot) != 0); diff --git a/tokio/src/runtime/time/wheel/mod.rs b/tokio/src/runtime/time/wheel/mod.rs index 8d94303544c..3c6a98c61f7 100644 --- a/tokio/src/runtime/time/wheel/mod.rs +++ b/tokio/src/runtime/time/wheel/mod.rs @@ -1,14 +1,17 @@ -use crate::runtime::time::{TimerHandle, TimerShared}; -use crate::time::error::InsertError; - mod level; pub(crate) use self::level::Expiration; use self::level::Level; -use std::{array, ptr::NonNull}; +mod entry; +pub(crate) use entry::Handle as EntryHandle; +use entry::TransitionToPending; +use entry::TransitionToRegistered; +use entry::{CancellationQueueEntry, Entry, EntryList}; + +pub(crate) mod cancellation_queue; +use cancellation_queue::Sender; -use super::entry::STATE_DEREGISTERED; -use super::EntryList; +use std::array; /// Timing wheel implementation. /// @@ -70,80 +73,77 @@ impl Wheel { /// /// # Arguments /// - /// * `item`: The item to insert into the wheel. + /// * `hdl`: The entry handle to insert into the wheel. /// /// # Return /// - /// Returns `Ok` when the item is successfully inserted, `Err` otherwise. - /// - /// `Err(Elapsed)` indicates that `when` represents an instant that has - /// already passed. In this case, the caller should fire the timeout - /// immediately. - /// - /// `Err(Invalid)` indicates an invalid `when` argument as been supplied. + /// * `true`: The entry was successfully inserted. + /// * `false`: the entry has already expired, in this case, + /// the entry is not inserted into the wheel. /// /// # Safety /// - /// This function registers item into an intrusive linked list. The caller - /// must ensure that `item` is pinned and will not be dropped without first - /// being deregistered. - pub(crate) unsafe fn insert( - &mut self, - item: TimerHandle, - ) -> Result { - let when = item.sync_when(); - - if when <= self.elapsed { - return Err((item, InsertError::Elapsed)); - } - - // Get the level at which the entry should be stored - let level = self.level_for(when); + /// The caller must ensure: + /// + /// * The entry is not already registered in ANY wheel. + pub(crate) unsafe fn insert(&mut self, hdl: EntryHandle, cancel_tx: Sender) -> Insert { + let deadline = hdl.deadline(); - unsafe { - self.levels[level].add_entry(item); + if deadline <= self.elapsed { + return Insert::Elapsed; } - debug_assert!({ - self.levels[level] - .next_expiration(self.elapsed) - .map(|e| e.deadline >= self.elapsed) - .unwrap_or(true) - }); + // Get the level at which the entry should be stored + let level = self.level_for(deadline); - Ok(when) - } + match hdl.transition_to_registered(cancel_tx) { + TransitionToRegistered::Success => { + unsafe { + self.levels[level].add_entry(hdl); + } - /// Removes `item` from the timing wheel. - pub(crate) unsafe fn remove(&mut self, item: NonNull) { - unsafe { - let when = item.as_ref().registered_when(); - if when == STATE_DEREGISTERED { - self.pending.remove(item); - } else { - debug_assert!( - self.elapsed <= when, - "elapsed={}; when={}", - self.elapsed, - when - ); + debug_assert!({ + self.levels[level] + .next_expiration(self.elapsed) + .map(|e| e.deadline >= self.elapsed) + .unwrap_or(true) + }); - let level = self.level_for(when); - self.levels[level].remove_entry(item); + Insert::Success } + TransitionToRegistered::Cancelling => Insert::Cancelling, } } - /// Instant at which to poll. - pub(crate) fn poll_at(&self) -> Option { - self.next_expiration().map(|expiration| expiration.deadline) + /// Removes `item` from the timing wheel. + /// + /// # Safety + /// + /// The caller must ensure: + /// + /// * The entry is already registered in THIS wheel. + pub(crate) unsafe fn remove(&mut self, hdl: EntryHandle) { + if hdl.is_pending() { + self.pending.remove(hdl.into()); + } else { + let deadline = hdl.deadline(); + debug_assert!( + self.elapsed <= deadline, + "elapsed={}; deadline={}", + self.elapsed, + deadline + ); + + let level = self.level_for(deadline); + self.levels[level].remove_entry(hdl.clone()); + } } /// Advances the timer up to the instant represented by `now`. - pub(crate) fn poll(&mut self, now: u64) -> Option { + pub(crate) fn poll(&mut self, now: u64) -> Option { loop { - if let Some(handle) = self.pending.pop_back() { - return Some(handle); + if let Some(hdl) = self.pending.pop_back() { + return Some(hdl); } match self.next_expiration() { @@ -193,7 +193,7 @@ impl Wheel { /// Returns the tick at which this timer wheel next needs to perform some /// processing, or None if there are no timers registered. - pub(super) fn next_expiration_time(&self) -> Option { + pub(crate) fn next_expiration_time(&self) -> Option { self.next_expiration().map(|ex| ex.deadline) } @@ -229,24 +229,20 @@ impl Wheel { // those entries again or we'll end up in an infinite loop. let mut entries = self.take_entries(expiration); - while let Some(item) = entries.pop_back() { + while let Some(hdl) = entries.pop_back() { if expiration.level == 0 { - debug_assert_eq!(unsafe { item.registered_when() }, expiration.deadline); + debug_assert_eq!(hdl.deadline(), expiration.deadline); } - // Try to expire the entry; this is cheap (doesn't synchronize) if - // the timer is not expired, and updates registered_when. - match unsafe { item.mark_pending(expiration.deadline) } { - Ok(()) => { - // Item was expired - self.pending.push_front(item); - } - Err(expiration_tick) => { - let level = level_for(expiration.deadline, expiration_tick); + match hdl.transition_to_pending(expiration.deadline) { + TransitionToPending::Success => self.pending.push_front(hdl), + TransitionToPending::NotElapsed(when) => { + let level = level_for(expiration.deadline, when); unsafe { - self.levels[level].add_entry(item); + self.levels[level].add_entry(hdl); } } + TransitionToPending::Cancelling => {} } } } @@ -292,6 +288,18 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / NUM_LEVELS } +pub(crate) enum Insert { + /// The entry was successfully inserted. + Success, + + /// The entry has already expired, in this case, + /// the entry is not inserted into the wheel. + Elapsed, + + /// The entry is being cancelled, no need to register it. + Cancelling, +} + #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/error.rs b/tokio/src/time/error.rs index 21920059090..aaf8847b81a 100644 --- a/tokio/src/time/error.rs +++ b/tokio/src/time/error.rs @@ -46,11 +46,6 @@ impl From for Error { #[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); -#[derive(Debug)] -pub(crate) enum InsertError { - Elapsed, -} - // ===== impl Error ===== impl Error { diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 02cecc6ec1a..42b2973a38a 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -484,10 +484,7 @@ impl Interval { .unwrap_or_else(Instant::far_future) }; - // When we arrive here, the internal delay returned `Poll::Ready`. - // Reset the delay but do not register it. It should be registered with - // the next call to [`poll_tick`]. - self.delay.as_mut().reset_without_reregister(next); + self.delay.as_mut().reset(next); // Return the time when we were scheduled to tick Poll::Ready(timeout) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 87261057bfe..d2720fe82ee 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,5 +1,5 @@ -use crate::runtime::time::TimerEntry; -use crate::time::{error::Error, Duration, Instant}; +use crate::runtime::time::Timer; +use crate::time::{Duration, Instant}; use crate::util::trace; use pin_project_lite::pin_project; @@ -227,7 +227,7 @@ pin_project! { // The link between the `Sleep` instance and the timer that drives it. #[pin] - entry: TimerEntry, + entry: Timer, } } @@ -252,14 +252,14 @@ impl Sleep { location: Option<&'static Location<'static>>, ) -> Sleep { use crate::runtime::scheduler; - let handle = scheduler::Handle::current(); - let entry = TimerEntry::new(handle, deadline); + let sched_hdl = scheduler::Handle::current(); + let entry = Timer::new(sched_hdl, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { - let handle = scheduler::Handle::current(); - let clock = handle.driver().clock(); - let handle = &handle.driver().time(); - let time_source = handle.time_source(); + let sched_hdl = scheduler::Handle::current(); + let clock = sched_hdl.driver().clock(); + let time_hdl = sched_hdl.driver().time(); + let time_source = time_hdl.time_source(); let deadline_tick = time_source.deadline_to_tick(deadline); let duration = deadline_tick.saturating_sub(time_source.now(clock)); @@ -349,25 +349,10 @@ impl Sleep { /// /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.reset_inner(deadline); - } - - /// Resets the `Sleep` instance to a new deadline without reregistering it - /// to be woken up. - /// - /// Calling this function allows changing the instant at which the `Sleep` - /// future completes without having to create new associated state and - /// without having it registered. This is required in e.g. the - /// [`crate::time::Interval`] where we want to reset the internal [Sleep] - /// without having it wake up the last task that polled it. - pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { - let mut me = self.project(); - me.entry.as_mut().reset(deadline, false); - } - - fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { + use crate::runtime::scheduler; let mut me = self.project(); - me.entry.as_mut().reset(deadline, true); + me.entry + .set(Timer::new(scheduler::Handle::current(), deadline)); #[cfg(all(tokio_unstable, feature = "tracing"))] { @@ -380,8 +365,12 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let clock = me.entry.clock(); - let time_source = me.entry.driver().time_source(); + use crate::runtime::scheduler; + + let handle = scheduler::Handle::current(); + let clock = handle.driver().clock(); + let handle = &handle.driver().time(); + let time_source = handle.time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) @@ -396,7 +385,7 @@ impl Sleep { } } - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> { let me = self.project(); ready!(crate::trace::trace_leaf(cx)); @@ -443,9 +432,6 @@ impl Future for Sleep { let _ao_span = self.inner.ctx.async_op_span.clone().entered(); #[cfg(all(tokio_unstable, feature = "tracing"))] let _ao_poll_span = self.inner.ctx.async_op_poll_span.clone().entered(); - match ready!(self.as_mut().poll_elapsed(cx)) { - Ok(()) => Poll::Ready(()), - Err(e) => panic!("timer error: {e}"), - } + self.as_mut().poll_elapsed(cx) } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index eeddd0af2e8..3988d03047e 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -30,8 +30,6 @@ pub(crate) use wake::{waker, Wake}; // rt and signal use `Notify`, which requires `WakeList`. feature = "rt", feature = "signal", - // time driver uses `WakeList` in `Handle::process_at_time`. - feature = "time", ))] mod wake_list; #[cfg(any( @@ -41,7 +39,6 @@ mod wake_list; feature = "fs", feature = "rt", feature = "signal", - feature = "time", ))] pub(crate) use wake_list::WakeList;