Skip to content

Commit be90205

Browse files
committed
rt: add support for non-send closures for thread (un)parking
Add support for non `Send`+`Sync` closures for thread parking and unparking callbacks when using a `LocalRuntime`. Since a `LocalRuntime` will always run its tasks on the same thread, its safe to accept a non `Send`+`Sync` closure. Signed-off-by: Sanskar Jaiswal <[email protected]>
1 parent 3636fd0 commit be90205

File tree

3 files changed

+169
-14
lines changed

3 files changed

+169
-14
lines changed

tokio/src/runtime/builder.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use std::io;
1313
use std::thread::ThreadId;
1414
use std::time::Duration;
1515

16+
// use super::LocalOptions;
17+
1618
/// Builds Tokio Runtime with custom configuration values.
1719
///
1820
/// Methods can be chained in order to set the configuration values. The
@@ -923,7 +925,7 @@ impl Builder {
923925
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
924926
pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
925927
match &self.kind {
926-
Kind::CurrentThread => self.build_current_thread_local_runtime(),
928+
Kind::CurrentThread => self.build_current_thread_local_runtime(options),
927929
#[cfg(feature = "rt-multi-thread")]
928930
Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
929931
}
@@ -1435,11 +1437,16 @@ impl Builder {
14351437
}
14361438

14371439
#[cfg(tokio_unstable)]
1438-
fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1440+
fn build_current_thread_local_runtime(
1441+
&mut self,
1442+
opts: LocalOptions,
1443+
) -> io::Result<LocalRuntime> {
14391444
use crate::runtime::local_runtime::LocalRuntimeScheduler;
14401445

14411446
let tid = std::thread::current().id();
14421447

1448+
self.before_park = opts.before_park;
1449+
self.after_unpark = opts.after_unpark;
14431450
let (scheduler, handle, blocking_pool) =
14441451
self.build_current_thread_runtime_components(Some(tid))?;
14451452

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,141 @@
11
use std::marker::PhantomData;
22

3+
use crate::runtime::Callback;
4+
35
/// [`LocalRuntime`]-only config options
46
///
5-
/// Currently, there are no such options, but in the future, things like `!Send + !Sync` hooks may
6-
/// be added.
7-
///
87
/// Use `LocalOptions::default()` to create the default set of options. This type is used with
98
/// [`Builder::build_local`].
109
///
10+
/// When using [`Builder::build_local`], this overrides any preconfigured options set on the
11+
/// [`Builder`].
12+
///
1113
/// [`Builder::build_local`]: crate::runtime::Builder::build_local
1214
/// [`LocalRuntime`]: crate::runtime::LocalRuntime
13-
#[derive(Default, Debug)]
15+
#[derive(Default)]
1416
#[non_exhaustive]
17+
#[allow(missing_debug_implementations)]
1518
pub struct LocalOptions {
1619
/// Marker used to make this !Send and !Sync.
1720
_phantom: PhantomData<*mut u8>,
21+
22+
/// Callback for a worker parking itself
23+
pub(crate) before_park: Option<Callback>,
24+
25+
/// Callback for a worker unparking itself
26+
pub(crate) after_unpark: Option<Callback>,
27+
}
28+
29+
impl LocalOptions {
30+
/// Executes function `f` just before a thread is parked (goes idle).
31+
/// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
32+
/// can be called, and may result in this thread being unparked immediately.
33+
///
34+
/// This can be used to start work only when the executor is idle, or for bookkeeping
35+
/// and monitoring purposes.
36+
///
37+
/// This differs from the `Builder::on_thread_park` method in that it accepts a non Send + Sync
38+
/// closure.
39+
///
40+
/// Note: There can only be one park callback for a runtime; calling this function
41+
/// more than once replaces the last callback defined, rather than adding to it.
42+
///
43+
/// # Examples
44+
///
45+
/// ```
46+
/// # use tokio::runtime::{Builder, LocalOptions};
47+
/// # pub fn main() {
48+
/// let (tx, rx) = std::sync::mpsc::channel();
49+
/// let mut opts = LocalOptions::default();
50+
/// opts.on_thread_park(move || match rx.recv() {
51+
/// Ok(x) => println!("Received from channel: {}", x),
52+
/// Err(e) => println!("Error receiving from channel: {}", e),
53+
/// });
54+
///
55+
/// let runtime = Builder::new_current_thread()
56+
/// .enable_time()
57+
/// .build_local(opts)
58+
/// .unwrap();
59+
///
60+
/// runtime.block_on(async {
61+
/// tokio::task::spawn_local(async move {
62+
/// tx.send(42).unwrap();
63+
/// });
64+
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
65+
/// })
66+
/// }
67+
/// ```
68+
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
69+
where
70+
F: Fn() + 'static,
71+
{
72+
self.before_park = Some(std::sync::Arc::new(to_send_sync(f)));
73+
self
74+
}
75+
76+
/// Executes function `f` just after a thread unparks (starts executing tasks).
77+
///
78+
/// This is intended for bookkeeping and monitoring use cases; note that work
79+
/// in this callback will increase latencies when the application has allowed one or
80+
/// more runtime threads to go idle.
81+
///
82+
/// This differs from the `Builder::on_thread_unpark` method in that it accepts a non Send + Sync
83+
/// closure.
84+
///
85+
/// Note: There can only be one unpark callback for a runtime; calling this function
86+
/// more than once replaces the last callback defined, rather than adding to it.
87+
///
88+
/// # Examples
89+
///
90+
/// ```
91+
/// # use tokio::runtime::{Builder, LocalOptions};
92+
/// # pub fn main() {
93+
/// let (tx, rx) = std::sync::mpsc::channel();
94+
/// let mut opts = LocalOptions::default();
95+
/// opts.on_thread_unpark(move || match rx.recv() {
96+
/// Ok(x) => println!("Received from channel: {}", x),
97+
/// Err(e) => println!("Error receiving from channel: {}", e),
98+
/// });
99+
///
100+
/// let runtime = Builder::new_current_thread()
101+
/// .enable_time()
102+
/// .build_local(opts)
103+
/// .unwrap();
104+
///
105+
/// runtime.block_on(async {
106+
/// tokio::task::spawn_local(async move {
107+
/// tx.send(42).unwrap();
108+
/// });
109+
/// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
110+
/// })
111+
/// }
112+
/// ```
113+
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
114+
where
115+
F: Fn() + 'static,
116+
{
117+
self.after_unpark = Some(std::sync::Arc::new(to_send_sync(f)));
118+
self
119+
}
120+
}
121+
122+
// A wrapper type to allow non-Send + Sync closures to be used in a Send + Sync context.
123+
// This is specifically used for executing callbacks when using a `LocalRuntime`, since the
124+
// callbacks will never be sent across threads and thus do not need to be Send or Sync.
125+
struct UnsafeSendSync<T>(T);
126+
unsafe impl<T> Send for UnsafeSendSync<T> {}
127+
unsafe impl<T> Sync for UnsafeSendSync<T> {}
128+
129+
impl<T: Fn()> UnsafeSendSync<T> {
130+
fn call(&self) {
131+
(self.0)()
132+
}
133+
}
134+
135+
fn to_send_sync<F>(f: F) -> impl Fn() + Send + Sync
136+
where
137+
F: Fn(),
138+
{
139+
let f = UnsafeSendSync(f);
140+
move || f.call()
18141
}

tokio/tests/rt_local.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::task::spawn_local;
66

77
#[test]
88
fn test_spawn_local_in_runtime() {
9-
let rt = rt();
9+
let rt = rt(LocalOptions::default());
1010

1111
let res = rt.block_on(async move {
1212
let (tx, rx) = tokio::sync::oneshot::channel();
@@ -22,9 +22,34 @@ fn test_spawn_local_in_runtime() {
2222
assert_eq!(res, 5);
2323
}
2424

25+
#[test]
26+
fn test_on_thread_park_in_runtime() {
27+
let mut opts = LocalOptions::default();
28+
let called = std::rc::Rc::new(std::cell::RefCell::new(false));
29+
let cc = called.clone();
30+
opts.on_thread_park(move || {
31+
*cc.borrow_mut() = true;
32+
});
33+
let rt = rt(opts);
34+
35+
rt.block_on(async move {
36+
let (tx, rx) = tokio::sync::oneshot::channel();
37+
38+
spawn_local(async {
39+
tokio::task::yield_now().await;
40+
tx.send(5).unwrap();
41+
});
42+
43+
// this is not really required execpt to ensure on_thread_park is called
44+
rx.await.unwrap()
45+
});
46+
47+
assert!(*called.borrow());
48+
}
49+
2550
#[test]
2651
fn test_spawn_from_handle() {
27-
let rt = rt();
52+
let rt = rt(LocalOptions::default());
2853

2954
let (tx, rx) = tokio::sync::oneshot::channel();
3055

@@ -40,7 +65,7 @@ fn test_spawn_from_handle() {
4065

4166
#[test]
4267
fn test_spawn_local_on_runtime_object() {
43-
let rt = rt();
68+
let rt = rt(LocalOptions::default());
4469

4570
let (tx, rx) = tokio::sync::oneshot::channel();
4671

@@ -56,7 +81,7 @@ fn test_spawn_local_on_runtime_object() {
5681

5782
#[test]
5883
fn test_spawn_local_from_guard() {
59-
let rt = rt();
84+
let rt = rt(LocalOptions::default());
6085

6186
let (tx, rx) = tokio::sync::oneshot::channel();
6287

@@ -78,7 +103,7 @@ fn test_spawn_from_guard_other_thread() {
78103
let (tx, rx) = std::sync::mpsc::channel();
79104

80105
std::thread::spawn(move || {
81-
let rt = rt();
106+
let rt = rt(LocalOptions::default());
82107
let handle = rt.handle().clone();
83108

84109
tx.send(handle).unwrap();
@@ -98,7 +123,7 @@ fn test_spawn_local_from_guard_other_thread() {
98123
let (tx, rx) = std::sync::mpsc::channel();
99124

100125
std::thread::spawn(move || {
101-
let rt = rt();
126+
let rt = rt(LocalOptions::default());
102127
let handle = rt.handle().clone();
103128

104129
tx.send(handle).unwrap();
@@ -111,9 +136,9 @@ fn test_spawn_local_from_guard_other_thread() {
111136
spawn_local(async {});
112137
}
113138

114-
fn rt() -> tokio::runtime::LocalRuntime {
139+
fn rt(opts: LocalOptions) -> tokio::runtime::LocalRuntime {
115140
tokio::runtime::Builder::new_current_thread()
116141
.enable_all()
117-
.build_local(LocalOptions::default())
142+
.build_local(opts)
118143
.unwrap()
119144
}

0 commit comments

Comments
 (0)