From 70b2325659aca6d067944b08cb337f30bf5cb816 Mon Sep 17 00:00:00 2001 From: Adam Date: Tue, 26 Mar 2024 14:05:35 -0700 Subject: [PATCH 1/4] keep_worker_alive feature and test --- Cargo.toml | 1 + src/wasm32/js/web_worker.js | 5 ++++- src/wasm32/js/web_worker_module.js | 9 ++++++--- src/wasm32/mod.rs | 8 ++++++++ tests/wasm.rs | 23 +++++++++++++++++++++++ 5 files changed, 42 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a313b6e..c1e7556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ readme = "README.md" [features] default = ["es_modules"] es_modules = [] +keep_worker_alive = [] [dependencies] wasm-bindgen = "0.2" diff --git a/src/wasm32/js/web_worker.js b/src/wasm32/js/web_worker.js index 7f02ee4..0bfcbe6 100644 --- a/src/wasm32/js/web_worker.js +++ b/src/wasm32/js/web_worker.js @@ -21,8 +21,11 @@ self.onmessage = event => { // This executes closure defined by work context. wasm.wasm_thread_entry_point(work); - // Once done, terminate web worker + // Once done, check if worker should close + if(!wasm.keep_worker_alive()){ + // terminate web worker close(); + } }); }; \ No newline at end of file diff --git a/src/wasm32/js/web_worker_module.js b/src/wasm32/js/web_worker_module.js index 84945b9..62569e5 100644 --- a/src/wasm32/js/web_worker_module.js +++ b/src/wasm32/js/web_worker_module.js @@ -1,5 +1,5 @@ // synchronously, using the browser, import wasm_bindgen shim JS scripts -import init, {wasm_thread_entry_point} from "WASM_BINDGEN_SHIM_URL"; +import init, {wasm_thread_entry_point, keep_worker_alive} from "WASM_BINDGEN_SHIM_URL"; // Wait for the main thread to send us the shared module/memory and work context. // Once we've got it, initialize it all with the `wasm_bindgen` global we imported via @@ -21,7 +21,10 @@ self.onmessage = event => { // This executes closure defined by work context. wasm_thread_entry_point(work); - // Once done, terminate web worker - close(); + // Once done, check if worker should close + if(!keep_worker_alive()){ + // terminate web worker + close(); + } }); }; \ No newline at end of file diff --git a/src/wasm32/mod.rs b/src/wasm32/mod.rs index a413b20..7121d20 100644 --- a/src/wasm32/mod.rs +++ b/src/wasm32/mod.rs @@ -33,6 +33,14 @@ pub fn wasm_thread_entry_point(ptr: u32) { WorkerMessage::ThreadComplete.post(); } +#[wasm_bindgen] +pub fn keep_worker_alive() -> bool { + #[cfg(feature = "keep_worker_alive")] + return true; + #[cfg(not(feature = "keep_worker_alive"))] + return false; +} + /// Used to relay spawn requests from web workers to main thread struct BuilderRequest { builder: Builder, diff --git a/tests/wasm.rs b/tests/wasm.rs index 9c90ec1..6b8a613 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -121,3 +121,26 @@ async fn thread_async_channel() { let result = main_rx.recv().await.unwrap(); assert_eq!(result, "Hello world!"); } + + +#[wasm_bindgen_test] +async fn keep_worker_alive(){ + let (thread_tx, main_rx) = async_channel::unbounded::(); + + thread::spawn(|| { + wasm_bindgen_futures::spawn_local(async move { + let promise = js_sys::Promise::resolve(&wasm_bindgen::JsValue::from(42)); + // Convert that promise into a future + let x = wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); + // This should send if "keep_worker_alive" is enabled. If disabled, + // the thread will close before the message can be sent + thread_tx.send("After js future".to_string()).await.unwrap(); + }); + }); + // If "keep_worker_alive" is disabled this will not recieve the message and + // eventually timeout the test + let mut msg = main_rx.recv().await.unwrap(); + + assert_eq!(msg, "After js future"); +} + From b0ba16bfe310bab02f18be699926812d4489299a Mon Sep 17 00:00:00 2001 From: Adam Date: Tue, 26 Mar 2024 20:25:18 -0700 Subject: [PATCH 2/4] spawn_async --- Cargo.toml | 3 ++ src/wasm32/js/web_worker.js | 15 ++++-- src/wasm32/js/web_worker_module.js | 14 +++-- src/wasm32/mod.rs | 86 ++++++++++++++++++++++++++---- src/wasm32/scoped.rs | 6 ++- src/wasm32/utils.rs | 17 ++++++ tests/wasm.rs | 29 ++++++---- 7 files changed, 141 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c1e7556..43e0ae5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ keep_worker_alive = [] [dependencies] wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4.42" web-sys = { version = "0.3", features = [ "Blob", "DedicatedWorkerGlobalScope", @@ -33,6 +34,8 @@ web-sys = { version = "0.3", features = [ ] } js-sys = "0.3" futures = "0.3" +async-std = "1.12.0" +once_cell = "1.8" [dev-dependencies] log = "0.4" diff --git a/src/wasm32/js/web_worker.js b/src/wasm32/js/web_worker.js index 0bfcbe6..b385402 100644 --- a/src/wasm32/js/web_worker.js +++ b/src/wasm32/js/web_worker.js @@ -5,7 +5,7 @@ importScripts('WASM_BINDGEN_SHIM_URL'); // Once we've got it, initialize it all with the `wasm_bindgen` global we imported via // `importScripts`. self.onmessage = event => { - let [ module, memory, work ] = event.data; + let [ module, memory, work, thread_key ] = event.data; wasm_bindgen(module, memory).catch(err => { console.log(err); @@ -20,12 +20,17 @@ self.onmessage = event => { // Enter rust code by calling entry point defined in `lib.rs`. // This executes closure defined by work context. wasm.wasm_thread_entry_point(work); + close(); - // Once done, check if worker should close + // Once done, check if worker should close based on the "keep_worker_alive" feature if(!wasm.keep_worker_alive()){ - // terminate web worker - close(); - } + // Periodically check if the thread can close + setInterval(()=>{ + if(wasm.check_can_close(thread_key)){ + close(); + } + }, 200); + } }); }; \ No newline at end of file diff --git a/src/wasm32/js/web_worker_module.js b/src/wasm32/js/web_worker_module.js index 62569e5..79515c4 100644 --- a/src/wasm32/js/web_worker_module.js +++ b/src/wasm32/js/web_worker_module.js @@ -1,11 +1,11 @@ // synchronously, using the browser, import wasm_bindgen shim JS scripts -import init, {wasm_thread_entry_point, keep_worker_alive} from "WASM_BINDGEN_SHIM_URL"; +import init, {wasm_thread_entry_point, keep_worker_alive, check_can_close} from "WASM_BINDGEN_SHIM_URL"; // Wait for the main thread to send us the shared module/memory and work context. // Once we've got it, initialize it all with the `wasm_bindgen` global we imported via // `importScripts`. self.onmessage = event => { - let [ module, memory, work ] = event.data; + let [ module, memory, work, thread_key ] = event.data; init(module, memory).catch(err => { console.log(err); @@ -21,10 +21,14 @@ self.onmessage = event => { // This executes closure defined by work context. wasm_thread_entry_point(work); - // Once done, check if worker should close + // Once done, check if worker should close based on the "keep_worker_alive" feature if(!keep_worker_alive()){ - // terminate web worker - close(); + // Periodically check if the thread can close + setInterval(()=>{ + if(check_can_close(thread_key)){ + close(); + } + }, 200); } }); }; \ No newline at end of file diff --git a/src/wasm32/mod.rs b/src/wasm32/mod.rs index 7121d20..8dba13d 100644 --- a/src/wasm32/mod.rs +++ b/src/wasm32/mod.rs @@ -16,11 +16,20 @@ use utils::SpinLockMutex; pub use utils::{available_parallelism, get_wasm_bindgen_shim_script_path, get_worker_script, is_web_worker_thread}; use wasm_bindgen::prelude::*; use web_sys::{DedicatedWorkerGlobalScope, Worker, WorkerOptions, WorkerType}; +use std::future::Future; +use std::pin::Pin; +use once_cell::sync::Lazy; +use std::collections::HashMap; mod scoped; mod signal; mod utils; +// Use a thread safe static hashmap to keep track of whether each thread can be closed +static CAN_CLOSE_MAP: Lazy>> = Lazy::new(|| { + Mutex::new(HashMap::new()) +}); + struct WebWorkerContext { func: Box, } @@ -33,6 +42,16 @@ pub fn wasm_thread_entry_point(ptr: u32) { WorkerMessage::ThreadComplete.post(); } +#[wasm_bindgen] +pub fn check_can_close(key: u32) -> bool { + let mut map = CAN_CLOSE_MAP.lock().unwrap(); + let can_close = *map.get(&key).unwrap_or(&true); + if(can_close){ + map.remove(&key); + } + can_close +} + #[wasm_bindgen] pub fn keep_worker_alive() -> bool { #[cfg(feature = "keep_worker_alive")] @@ -45,11 +64,12 @@ pub fn keep_worker_alive() -> bool { struct BuilderRequest { builder: Builder, context: WebWorkerContext, + key: u32, } impl BuilderRequest { pub unsafe fn spawn(self) { - self.builder.spawn_for_context(self.context); + self.builder.spawn_for_context(self.context, self.key); } } @@ -156,13 +176,13 @@ impl Builder { /// Spawns a new thread by taking ownership of the `Builder`, and returns an /// [std::io::Result] to its [`JoinHandle`]. - pub fn spawn(self, f: F) -> std::io::Result> + pub fn spawn(self, f: F, key: u32) -> std::io::Result> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { - unsafe { self.spawn_unchecked(f) } + unsafe { self.spawn_unchecked(f, key) } } /// Spawns a new thread without any lifetime restrictions by taking ownership @@ -179,19 +199,20 @@ impl Builder { /// - use only types with `'static` lifetime bounds, i.e., those with no or only /// `'static` references (both [`Builder::spawn`] /// and [`spawn`] enforce this property statically) - pub unsafe fn spawn_unchecked<'a, F, T>(self, f: F) -> std::io::Result> + pub unsafe fn spawn_unchecked<'a, F, T>(self, f: F, key: u32) -> std::io::Result> where F: FnOnce() -> T, F: Send + 'a, T: Send + 'a, { - Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None) }?)) + Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None, key) }?)) } pub(crate) unsafe fn spawn_unchecked_<'a, 'scope, F, T>( self, f: F, scope_data: Option>, + key: u32, ) -> std::io::Result> where F: FnOnce() -> T, @@ -260,9 +281,9 @@ impl Builder { }; if is_web_worker_thread() { - WorkerMessage::SpawnThread(BuilderRequest { builder: self, context }).post(); + WorkerMessage::SpawnThread(BuilderRequest { builder: self, context, key }).post(); } else { - self.spawn_for_context(context); + self.spawn_for_context(context, key); } if let Some(scope) = &my_packet.scope { @@ -275,7 +296,7 @@ impl Builder { }) } - unsafe fn spawn_for_context(self, ctx: WebWorkerContext) { + unsafe fn spawn_for_context(self, ctx: WebWorkerContext, key: u32) { let Builder { name, prefix, @@ -345,6 +366,7 @@ impl Builder { init.push(&wasm_bindgen::module()); init.push(&wasm_bindgen::memory()); init.push(&JsValue::from(ctx_ptr as u32)); + init.push(&JsValue::from(key)); // Send initialization message match worker.post_message(&init) { @@ -461,5 +483,51 @@ where F: Send + 'static, T: Send + 'static, { - Builder::new().spawn(f).expect("failed to spawn thread") + let thread_key = utils::create_available_thread_key(); + let mut map = CAN_CLOSE_MAP.lock().unwrap(); + //thread can close immediately + map.insert(thread_key, true); + + spawn_with_key(f, thread_key).expect("failed to spawn thread") +} + +trait AsyncClosure { + fn call_once(self) -> Pin>>; +} + +impl AsyncClosure for F +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + 'static, +{ + fn call_once(self) -> Pin>> { + Box::pin(self()) + } +} + +// JoinHandle is of type () because the future immediately returns. +pub fn spawn_async(f: F) -> JoinHandle<()> +where + F: AsyncClosure, + F: Send + 'static, +{ + let thread_key = utils::create_available_thread_key(); + + spawn_with_key(move || { + wasm_bindgen_futures::spawn_local(async move { + f.call_once().await; + let mut map = CAN_CLOSE_MAP.lock().unwrap(); + //thread closes after awaiting the future + map.insert(thread_key, true); + }); + }, thread_key).expect("failed to spawn thread") +} + +pub fn spawn_with_key(f: F, key: u32) -> std::io::Result> +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + Builder::new().spawn(f, key) } diff --git a/src/wasm32/scoped.rs b/src/wasm32/scoped.rs index cd18532..80a60b6 100644 --- a/src/wasm32/scoped.rs +++ b/src/wasm32/scoped.rs @@ -9,6 +9,7 @@ use std::{ use super::{signal::Signal, utils::is_web_worker_thread, Builder, JoinInner}; + /// A scope to spawn scoped threads in. /// /// See [`scope`] for details. @@ -171,8 +172,11 @@ impl Builder { F: FnOnce() -> T + Send + 'scope, T: Send + 'scope, { + let thread_key = crate::wasm32::utils::create_available_thread_key(); + let mut map = crate::wasm32::CAN_CLOSE_MAP.lock().unwrap(); + map.insert(thread_key, true); Ok(ScopedJoinHandle(unsafe { - self.spawn_unchecked_(f, Some(scope.data.clone())) + self.spawn_unchecked_(f, Some(scope.data.clone()), thread_key) }?)) } } diff --git a/src/wasm32/utils.rs b/src/wasm32/utils.rs index 11da5e2..d3b3aff 100644 --- a/src/wasm32/utils.rs +++ b/src/wasm32/utils.rs @@ -6,6 +6,7 @@ use std::{ use wasm_bindgen::prelude::*; use web_sys::{Blob, Url, WorkerGlobalScope}; +use crate::wasm32::CAN_CLOSE_MAP; pub fn available_parallelism() -> io::Result { if let Some(window) = web_sys::window() { @@ -103,3 +104,19 @@ impl SpinLockMutex for Mutex { } } } + +pub fn create_available_thread_key() -> u32 { + let mut thread_key: Option = None; + + let mut map = CAN_CLOSE_MAP.lock().unwrap(); + + for key in 0..=std::u32::MAX { + if !map.contains_key(&key) { + map.insert(key, false); + thread_key = Some(key); + break; + } + } + + thread_key.expect("Unable to generate unique thread key") +} diff --git a/tests/wasm.rs b/tests/wasm.rs index 6b8a613..14b2618 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -122,23 +122,34 @@ async fn thread_async_channel() { assert_eq!(result, "Hello world!"); } - +// This test should fail if "keep_worker_alive" enabled #[wasm_bindgen_test] async fn keep_worker_alive(){ - let (thread_tx, main_rx) = async_channel::unbounded::(); - thread::spawn(|| { wasm_bindgen_futures::spawn_local(async move { let promise = js_sys::Promise::resolve(&wasm_bindgen::JsValue::from(42)); - // Convert that promise into a future let x = wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); - // This should send if "keep_worker_alive" is enabled. If disabled, - // the thread will close before the message can be sent - thread_tx.send("After js future".to_string()).await.unwrap(); + //additional wait to simulate a js future that takes more time + async_std::task::sleep(std::time::Duration::from_secs(1)).await; + // This should only run if "keep_worker_alive" is enabled. If disabled, + // the thread will close before it can run + assert_eq!(1, 2); }); }); - // If "keep_worker_alive" is disabled this will not recieve the message and - // eventually timeout the test +} + +#[wasm_bindgen_test] +async fn spawn_async(){ + let (thread_tx, main_rx) = async_channel::unbounded::(); + //since spawn_async closes the thread once the provided closure is complete, + //"keep_worker_alive" is not necessary + thread::spawn_async(|| async move{ + let promise = js_sys::Promise::resolve(&wasm_bindgen::JsValue::from(42)); + let x = wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); + //additional wait to simulate a js future that takes more time + async_std::task::sleep(std::time::Duration::from_secs(1)).await; + thread_tx.send("After js future".to_string()).await.unwrap(); + }); let mut msg = main_rx.recv().await.unwrap(); assert_eq!(msg, "After js future"); From 2ee2621974e1a1909c2e0c1d6a09ebc08761f5e4 Mon Sep 17 00:00:00 2001 From: adamgerhant <116332429+adamgerhant@users.noreply.github.com> Date: Sat, 20 Apr 2024 16:25:00 -0700 Subject: [PATCH 3/4] Remove default close() Co-authored-by: Vladimir Nachbaur --- src/wasm32/js/web_worker.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/wasm32/js/web_worker.js b/src/wasm32/js/web_worker.js index b385402..dc759d6 100644 --- a/src/wasm32/js/web_worker.js +++ b/src/wasm32/js/web_worker.js @@ -20,7 +20,6 @@ self.onmessage = event => { // Enter rust code by calling entry point defined in `lib.rs`. // This executes closure defined by work context. wasm.wasm_thread_entry_point(work); - close(); // Once done, check if worker should close based on the "keep_worker_alive" feature if(!wasm.keep_worker_alive()){ From 999a94b24cca580f78816391f61af9996f45181b Mon Sep 17 00:00:00 2001 From: Adam Date: Mon, 6 May 2024 00:12:36 -0700 Subject: [PATCH 4/4] Add JoinHandle for spawn_async and close thread from Rust --- src/wasm32/js/web_worker.js | 10 -- src/wasm32/js/web_worker_module.js | 12 +- src/wasm32/mod.rs | 230 +++++++++++++++++------------ src/wasm32/scoped.rs | 5 +- src/wasm32/utils.rs | 16 -- tests/wasm.rs | 56 ++++++- 6 files changed, 191 insertions(+), 138 deletions(-) diff --git a/src/wasm32/js/web_worker.js b/src/wasm32/js/web_worker.js index dc759d6..e5a0269 100644 --- a/src/wasm32/js/web_worker.js +++ b/src/wasm32/js/web_worker.js @@ -20,16 +20,6 @@ self.onmessage = event => { // Enter rust code by calling entry point defined in `lib.rs`. // This executes closure defined by work context. wasm.wasm_thread_entry_point(work); - - // Once done, check if worker should close based on the "keep_worker_alive" feature - if(!wasm.keep_worker_alive()){ - // Periodically check if the thread can close - setInterval(()=>{ - if(wasm.check_can_close(thread_key)){ - close(); - } - }, 200); - } }); }; \ No newline at end of file diff --git a/src/wasm32/js/web_worker_module.js b/src/wasm32/js/web_worker_module.js index 79515c4..375a5d4 100644 --- a/src/wasm32/js/web_worker_module.js +++ b/src/wasm32/js/web_worker_module.js @@ -1,5 +1,5 @@ // synchronously, using the browser, import wasm_bindgen shim JS scripts -import init, {wasm_thread_entry_point, keep_worker_alive, check_can_close} from "WASM_BINDGEN_SHIM_URL"; +import init, {wasm_thread_entry_point} from "WASM_BINDGEN_SHIM_URL"; // Wait for the main thread to send us the shared module/memory and work context. // Once we've got it, initialize it all with the `wasm_bindgen` global we imported via @@ -20,15 +20,5 @@ self.onmessage = event => { // Enter rust code by calling entry point defined in `lib.rs`. // This executes closure defined by work context. wasm_thread_entry_point(work); - - // Once done, check if worker should close based on the "keep_worker_alive" feature - if(!keep_worker_alive()){ - // Periodically check if the thread can close - setInterval(()=>{ - if(check_can_close(thread_key)){ - close(); - } - }, 200); - } }); }; \ No newline at end of file diff --git a/src/wasm32/mod.rs b/src/wasm32/mod.rs index 8dba13d..a9c5a85 100644 --- a/src/wasm32/mod.rs +++ b/src/wasm32/mod.rs @@ -1,4 +1,4 @@ -pub use std::thread::{current, sleep, Result, Thread, ThreadId}; +pub use std::thread::{Result, Thread}; use std::{ cell::UnsafeCell, fmt, @@ -18,17 +18,14 @@ use wasm_bindgen::prelude::*; use web_sys::{DedicatedWorkerGlobalScope, Worker, WorkerOptions, WorkerType}; use std::future::Future; use std::pin::Pin; -use once_cell::sync::Lazy; -use std::collections::HashMap; + mod scoped; mod signal; mod utils; // Use a thread safe static hashmap to keep track of whether each thread can be closed -static CAN_CLOSE_MAP: Lazy>> = Lazy::new(|| { - Mutex::new(HashMap::new()) -}); + struct WebWorkerContext { func: Box, @@ -42,34 +39,15 @@ pub fn wasm_thread_entry_point(ptr: u32) { WorkerMessage::ThreadComplete.post(); } -#[wasm_bindgen] -pub fn check_can_close(key: u32) -> bool { - let mut map = CAN_CLOSE_MAP.lock().unwrap(); - let can_close = *map.get(&key).unwrap_or(&true); - if(can_close){ - map.remove(&key); - } - can_close -} - -#[wasm_bindgen] -pub fn keep_worker_alive() -> bool { - #[cfg(feature = "keep_worker_alive")] - return true; - #[cfg(not(feature = "keep_worker_alive"))] - return false; -} - /// Used to relay spawn requests from web workers to main thread struct BuilderRequest { builder: Builder, context: WebWorkerContext, - key: u32, } impl BuilderRequest { pub unsafe fn spawn(self) { - self.builder.spawn_for_context(self.context, self.key); + self.builder.spawn_for_context(self.context); } } @@ -94,6 +72,30 @@ impl WorkerMessage { } } +// Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*. +// See for more details. +// To prevent leaks we use a wrapper that drops its contents. +#[repr(transparent)] +struct MaybeDangling(mem::MaybeUninit); +impl MaybeDangling { + fn new(x: T) -> Self { + MaybeDangling(mem::MaybeUninit::new(x)) + } + fn into_inner(self) -> T { + // SAFETY: we are always initiailized. + let ret = unsafe { self.0.assume_init_read() }; + // Make sure we don't drop. + mem::forget(self); + ret + } +} +impl Drop for MaybeDangling { + fn drop(&mut self) { + // SAFETY: we are always initiailized. + unsafe { self.0.assume_init_drop() }; + } +} + static DEFAULT_BUILDER: Mutex> = Mutex::new(None); /// Thread factory, which can be used in order to configure the properties of a new thread. @@ -176,13 +178,22 @@ impl Builder { /// Spawns a new thread by taking ownership of the `Builder`, and returns an /// [std::io::Result] to its [`JoinHandle`]. - pub fn spawn(self, f: F, key: u32) -> std::io::Result> + + pub fn spawn(self, f: F) -> std::io::Result> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { - unsafe { self.spawn_unchecked(f, key) } + unsafe { self.spawn_unchecked(f) } + } + + pub fn spawn_async(self, f: F) -> std::io::Result> + where + F: AsyncClosure + Send + 'static, + T: Send + 'static, + { + unsafe { self.spawn_unchecked_async(f) } } /// Spawns a new thread without any lifetime restrictions by taking ownership @@ -199,20 +210,27 @@ impl Builder { /// - use only types with `'static` lifetime bounds, i.e., those with no or only /// `'static` references (both [`Builder::spawn`] /// and [`spawn`] enforce this property statically) - pub unsafe fn spawn_unchecked<'a, F, T>(self, f: F, key: u32) -> std::io::Result> + pub unsafe fn spawn_unchecked<'a, F, T>(self, f: F) -> std::io::Result> where F: FnOnce() -> T, F: Send + 'a, T: Send + 'a, { - Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None, key) }?)) + Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None) }?)) } + pub unsafe fn spawn_unchecked_async<'a, F, T>(self, f: F) -> std::io::Result> + where + F: AsyncClosure + Send + 'static, + T: Send + 'static, + { + Ok(JoinHandle(unsafe { self.spawn_unchecked_async_(f, None) }?)) + } + pub(crate) unsafe fn spawn_unchecked_<'a, 'scope, F, T>( self, f: F, scope_data: Option>, - key: u32, ) -> std::io::Result> where F: FnOnce() -> T, @@ -230,36 +248,81 @@ impl Builder { }); let their_packet = my_packet.clone(); - // Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*. - // See for more details. - // To prevent leaks we use a wrapper that drops its contents. - #[repr(transparent)] - struct MaybeDangling(mem::MaybeUninit); - impl MaybeDangling { - fn new(x: T) -> Self { - MaybeDangling(mem::MaybeUninit::new(x)) - } - fn into_inner(self) -> T { - // SAFETY: we are always initiailized. - let ret = unsafe { self.0.assume_init_read() }; - // Make sure we don't drop. - mem::forget(self); - ret - } + let f = MaybeDangling::new(f); + let main = Box::new(move || { + // SAFETY: we constructed `f` initialized. + let f = f.into_inner(); + // Execute the closure and catch any panics + let try_result = catch_unwind(AssertUnwindSafe(|| f())); + // SAFETY: `their_packet` as been built just above and moved by the + // closure (it is an Arc<...>) and `my_packet` will be stored in the + // same `JoinInner` as this closure meaning the mutation will be + // safe (not modify it and affect a value far away). + unsafe { *their_packet.result.get() = Some(try_result) }; + // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that + // will call `decrement_num_running_threads` and therefore signal that this thread is + // done. + drop(their_packet); + // Notify waiting handles + their_signal.signal(); + // Here, the lifetime `'a` and even `'scope` can end, so the thread can be closed. `main` keeps running for a bit + // after that before returning itself. + #[cfg(not(feature = "keep_worker_alive"))] + js_sys::eval("self") + .unwrap() + .dyn_into::() + .unwrap() + .close(); + }); + + // Erase lifetime + let context = WebWorkerContext { + func: mem::transmute::, Box>(main), + }; + + if is_web_worker_thread() { + WorkerMessage::SpawnThread(BuilderRequest { builder: self, context }).post(); + } else { + self.spawn_for_context(context); } - impl Drop for MaybeDangling { - fn drop(&mut self) { - // SAFETY: we are always initiailized. - unsafe { self.0.assume_init_drop() }; - } + + if let Some(scope) = &my_packet.scope { + scope.increment_num_running_threads(); } + Ok(JoinInner { + signal: my_signal, + packet: my_packet, + }) + } + + pub(crate) unsafe fn spawn_unchecked_async_<'a, F, T>( + self, + f: F, + scope_data: Option>, + ) -> std::io::Result> + where + F: AsyncClosure + 'static, + T: Send + 'static, + { + let my_signal = Arc::new(Signal::new()); + let their_signal = my_signal.clone(); + + let my_packet: Arc> = Arc::new(Packet { + scope: scope_data, + result: UnsafeCell::new(None), + _marker: PhantomData, + }); + let their_packet = my_packet.clone(); + let f = MaybeDangling::new(f); - let main = Box::new(move || { + let spawn_local = wasm_bindgen_futures::spawn_local(async move { // SAFETY: we constructed `f` initialized. let f = f.into_inner(); // Execute the closure and catch any panics - let try_result = catch_unwind(AssertUnwindSafe(|| f())); + + let try_result = Ok(f.call_once().await); + // SAFETY: `their_packet` as been built just above and moved by the // closure (it is an Arc<...>) and `my_packet` will be stored in the // same `JoinInner` as this closure meaning the mutation will be @@ -271,9 +334,16 @@ impl Builder { drop(their_packet); // Notify waiting handles their_signal.signal(); - // Here, the lifetime `'a` and even `'scope` can end. `main` keeps running for a bit + // Here, the lifetime `'a` and even `'scope` can end, so the thread can be closed. `main` keeps running for a bit // after that before returning itself. + #[cfg(not(feature = "keep_worker_alive"))] + js_sys::eval("self") + .unwrap() + .dyn_into::() + .unwrap() + .close(); }); + let main = Box::new(move || spawn_local ); // Erase lifetime let context = WebWorkerContext { @@ -281,9 +351,9 @@ impl Builder { }; if is_web_worker_thread() { - WorkerMessage::SpawnThread(BuilderRequest { builder: self, context, key }).post(); + WorkerMessage::SpawnThread(BuilderRequest { builder: self, context }).post(); } else { - self.spawn_for_context(context, key); + self.spawn_for_context(context); } if let Some(scope) = &my_packet.scope { @@ -296,7 +366,7 @@ impl Builder { }) } - unsafe fn spawn_for_context(self, ctx: WebWorkerContext, key: u32) { + unsafe fn spawn_for_context(self, ctx: WebWorkerContext) { let Builder { name, prefix, @@ -366,7 +436,6 @@ impl Builder { init.push(&wasm_bindgen::module()); init.push(&wasm_bindgen::memory()); init.push(&JsValue::from(ctx_ptr as u32)); - init.push(&JsValue::from(key)); // Send initialization message match worker.post_message(&init) { @@ -483,51 +552,30 @@ where F: Send + 'static, T: Send + 'static, { - let thread_key = utils::create_available_thread_key(); - let mut map = CAN_CLOSE_MAP.lock().unwrap(); - //thread can close immediately - map.insert(thread_key, true); - - spawn_with_key(f, thread_key).expect("failed to spawn thread") + Builder::new().spawn(f).expect("Failed to spawn thread") } -trait AsyncClosure { - fn call_once(self) -> Pin>>; +pub trait AsyncClosure { + fn call_once(self) -> Pin>>; } -impl AsyncClosure for F +impl AsyncClosure for F where F: FnOnce() -> Fut + Send + 'static, - Fut: Future + 'static, + Fut: Future + 'static, + T: Send + 'static, { - fn call_once(self) -> Pin>> { + fn call_once(self) -> Pin>> { Box::pin(self()) } } // JoinHandle is of type () because the future immediately returns. -pub fn spawn_async(f: F) -> JoinHandle<()> +pub fn spawn_async(f: F) -> JoinHandle where - F: AsyncClosure, - F: Send + 'static, -{ - let thread_key = utils::create_available_thread_key(); - - spawn_with_key(move || { - wasm_bindgen_futures::spawn_local(async move { - f.call_once().await; - let mut map = CAN_CLOSE_MAP.lock().unwrap(); - //thread closes after awaiting the future - map.insert(thread_key, true); - }); - }, thread_key).expect("failed to spawn thread") -} - -pub fn spawn_with_key(f: F, key: u32) -> std::io::Result> -where - F: FnOnce() -> T, + F: AsyncClosure, F: Send + 'static, T: Send + 'static, { - Builder::new().spawn(f, key) -} + Builder::new().spawn_async(f).expect("failed to spawn thread") +} \ No newline at end of file diff --git a/src/wasm32/scoped.rs b/src/wasm32/scoped.rs index 80a60b6..fdf1891 100644 --- a/src/wasm32/scoped.rs +++ b/src/wasm32/scoped.rs @@ -172,11 +172,8 @@ impl Builder { F: FnOnce() -> T + Send + 'scope, T: Send + 'scope, { - let thread_key = crate::wasm32::utils::create_available_thread_key(); - let mut map = crate::wasm32::CAN_CLOSE_MAP.lock().unwrap(); - map.insert(thread_key, true); Ok(ScopedJoinHandle(unsafe { - self.spawn_unchecked_(f, Some(scope.data.clone()), thread_key) + self.spawn_unchecked_(f, Some(scope.data.clone())) }?)) } } diff --git a/src/wasm32/utils.rs b/src/wasm32/utils.rs index d3b3aff..f42590f 100644 --- a/src/wasm32/utils.rs +++ b/src/wasm32/utils.rs @@ -6,7 +6,6 @@ use std::{ use wasm_bindgen::prelude::*; use web_sys::{Blob, Url, WorkerGlobalScope}; -use crate::wasm32::CAN_CLOSE_MAP; pub fn available_parallelism() -> io::Result { if let Some(window) = web_sys::window() { @@ -105,18 +104,3 @@ impl SpinLockMutex for Mutex { } } -pub fn create_available_thread_key() -> u32 { - let mut thread_key: Option = None; - - let mut map = CAN_CLOSE_MAP.lock().unwrap(); - - for key in 0..=std::u32::MAX { - if !map.contains_key(&key) { - map.insert(key, false); - thread_key = Some(key); - break; - } - } - - thread_key.expect("Unable to generate unique thread key") -} diff --git a/tests/wasm.rs b/tests/wasm.rs index 14b2618..90c7276 100644 --- a/tests/wasm.rs +++ b/tests/wasm.rs @@ -1,8 +1,12 @@ #![cfg(target_arch = "wasm32")] use core::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, + sync::atomic::{AtomicBool, Ordering}, time::Duration +}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, }; use wasm_bindgen_test::*; @@ -122,13 +126,14 @@ async fn thread_async_channel() { assert_eq!(result, "Hello world!"); } +//TODO: doesn't fail when keep_worker_alive is enabled. Can threads be closed from wasm? // This test should fail if "keep_worker_alive" enabled #[wasm_bindgen_test] async fn keep_worker_alive(){ thread::spawn(|| { wasm_bindgen_futures::spawn_local(async move { let promise = js_sys::Promise::resolve(&wasm_bindgen::JsValue::from(42)); - let x = wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); + wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); //additional wait to simulate a js future that takes more time async_std::task::sleep(std::time::Duration::from_secs(1)).await; // This should only run if "keep_worker_alive" is enabled. If disabled, @@ -144,14 +149,53 @@ async fn spawn_async(){ //since spawn_async closes the thread once the provided closure is complete, //"keep_worker_alive" is not necessary thread::spawn_async(|| async move{ + let promise = js_sys::Promise::resolve(&wasm_bindgen::JsValue::from(42)); - let x = wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); - //additional wait to simulate a js future that takes more time + wasm_bindgen_futures::JsFuture::from(promise).await.unwrap(); + // //additional wait to simulate a js future that takes more time async_std::task::sleep(std::time::Duration::from_secs(1)).await; thread_tx.send("After js future".to_string()).await.unwrap(); }); - let mut msg = main_rx.recv().await.unwrap(); + let msg = main_rx.recv().await.unwrap(); assert_eq!(msg, "After js future"); } +struct DelayedValue { + start_time: f64, + delay_time: f64, +} + +impl Future for DelayedValue { + type Output = u32; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Check if the delay has elapsed + let performance_now = js_sys::Date::now(); + if self.start_time+self.delay_time < performance_now { + Poll::Ready(1234) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + + } +} + +impl DelayedValue { + pub fn new(duration: f64) -> Self { + let performance_now = js_sys::Date::now(); + DelayedValue {start_time: performance_now, delay_time: duration} + } +} + +#[wasm_bindgen_test] +async fn async_thread_join_async() { + + let handle = thread::spawn_async(|| async move { + DelayedValue::new(1000.0).await + }); + + assert_eq!(handle.join_async().await.unwrap(), 1234); +} +