Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ readme = "README.md"
[features]
default = ["es_modules"]
es_modules = []
keep_worker_alive = []

[dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4.42"
web-sys = { version = "0.3", features = [
"Blob",
"DedicatedWorkerGlobalScope",
Expand All @@ -32,6 +34,8 @@ web-sys = { version = "0.3", features = [
] }
js-sys = "0.3"
futures = "0.3"
async-std = "1.12.0"
once_cell = "1.8"

[dev-dependencies]
log = "0.4"
Expand Down
5 changes: 1 addition & 4 deletions src/wasm32/js/web_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ importScripts('WASM_BINDGEN_SHIM_URL');
// Once we've got it, initialize it all with the `wasm_bindgen` global we imported via
// `importScripts`.
self.onmessage = event => {
let [ module, memory, work ] = event.data;
let [ module, memory, work, thread_key ] = event.data;

wasm_bindgen(module, memory).catch(err => {
console.log(err);
Expand All @@ -20,9 +20,6 @@ self.onmessage = event => {
// Enter rust code by calling entry point defined in `lib.rs`.
// This executes closure defined by work context.
wasm.wasm_thread_entry_point(work);

// Once done, terminate web worker
close();
});
};

5 changes: 1 addition & 4 deletions src/wasm32/js/web_worker_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import init, {wasm_thread_entry_point} from "WASM_BINDGEN_SHIM_URL";
// Once we've got it, initialize it all with the `wasm_bindgen` global we imported via
// `importScripts`.
self.onmessage = event => {
let [ module, memory, work ] = event.data;
let [ module, memory, work, thread_key ] = event.data;

init(module, memory).catch(err => {
console.log(err);
Expand All @@ -20,8 +20,5 @@ self.onmessage = event => {
// Enter rust code by calling entry point defined in `lib.rs`.
// This executes closure defined by work context.
wasm_thread_entry_point(work);

// Once done, terminate web worker
close();
});
};
176 changes: 150 additions & 26 deletions src/wasm32/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use std::thread::{current, sleep, Result, Thread, ThreadId};
pub use std::thread::{Result, Thread};
use std::{
cell::UnsafeCell,
fmt,
Expand All @@ -16,11 +16,17 @@ use utils::SpinLockMutex;
pub use utils::{available_parallelism, get_wasm_bindgen_shim_script_path, get_worker_script, is_web_worker_thread};
use wasm_bindgen::prelude::*;
use web_sys::{DedicatedWorkerGlobalScope, Worker, WorkerOptions, WorkerType};
use std::future::Future;
use std::pin::Pin;


mod scoped;
mod signal;
mod utils;

// Use a thread safe static hashmap to keep track of whether each thread can be closed


struct WebWorkerContext {
func: Box<dyn FnOnce() + Send>,
}
Expand Down Expand Up @@ -66,6 +72,30 @@ impl WorkerMessage {
}
}

// Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*.
// See <https://github.com/rust-lang/rust/issues/101983> for more details.
// To prevent leaks we use a wrapper that drops its contents.
#[repr(transparent)]
struct MaybeDangling<T>(mem::MaybeUninit<T>);
impl<T> MaybeDangling<T> {
fn new(x: T) -> Self {
MaybeDangling(mem::MaybeUninit::new(x))
}
fn into_inner(self) -> T {
// SAFETY: we are always initiailized.
let ret = unsafe { self.0.assume_init_read() };
// Make sure we don't drop.
mem::forget(self);
ret
}
}
impl<T> Drop for MaybeDangling<T> {
fn drop(&mut self) {
// SAFETY: we are always initiailized.
unsafe { self.0.assume_init_drop() };
}
}

static DEFAULT_BUILDER: Mutex<Option<Builder>> = Mutex::new(None);

/// Thread factory, which can be used in order to configure the properties of a new thread.
Expand Down Expand Up @@ -148,6 +178,7 @@ impl Builder {

/// Spawns a new thread by taking ownership of the `Builder`, and returns an
/// [std::io::Result] to its [`JoinHandle`].

pub fn spawn<F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T,
Expand All @@ -157,6 +188,14 @@ impl Builder {
unsafe { self.spawn_unchecked(f) }
}

pub fn spawn_async<F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
where
F: AsyncClosure<T> + Send + 'static,
T: Send + 'static,
{
unsafe { self.spawn_unchecked_async(f) }
}

/// Spawns a new thread without any lifetime restrictions by taking ownership
/// of the `Builder`, and returns an [std::io::Result] to its [`JoinHandle`].
///
Expand All @@ -180,6 +219,14 @@ impl Builder {
Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None) }?))
}

pub unsafe fn spawn_unchecked_async<'a, F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
where
F: AsyncClosure<T> + Send + 'static,
T: Send + 'static,
{
Ok(JoinHandle(unsafe { self.spawn_unchecked_async_(f, None) }?))
}

pub(crate) unsafe fn spawn_unchecked_<'a, 'scope, F, T>(
self,
f: F,
Expand All @@ -201,36 +248,81 @@ impl Builder {
});
let their_packet = my_packet.clone();

// Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*.
// See <https://github.com/rust-lang/rust/issues/101983> for more details.
// To prevent leaks we use a wrapper that drops its contents.
#[repr(transparent)]
struct MaybeDangling<T>(mem::MaybeUninit<T>);
impl<T> MaybeDangling<T> {
fn new(x: T) -> Self {
MaybeDangling(mem::MaybeUninit::new(x))
}
fn into_inner(self) -> T {
// SAFETY: we are always initiailized.
let ret = unsafe { self.0.assume_init_read() };
// Make sure we don't drop.
mem::forget(self);
ret
}
let f = MaybeDangling::new(f);
let main = Box::new(move || {
// SAFETY: we constructed `f` initialized.
let f = f.into_inner();
// Execute the closure and catch any panics
let try_result = catch_unwind(AssertUnwindSafe(|| f()));
// SAFETY: `their_packet` as been built just above and moved by the
// closure (it is an Arc<...>) and `my_packet` will be stored in the
// same `JoinInner` as this closure meaning the mutation will be
// safe (not modify it and affect a value far away).
unsafe { *their_packet.result.get() = Some(try_result) };
// Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that
// will call `decrement_num_running_threads` and therefore signal that this thread is
// done.
drop(their_packet);
// Notify waiting handles
their_signal.signal();
// Here, the lifetime `'a` and even `'scope` can end, so the thread can be closed. `main` keeps running for a bit
// after that before returning itself.
#[cfg(not(feature = "keep_worker_alive"))]
js_sys::eval("self")
.unwrap()
.dyn_into::<DedicatedWorkerGlobalScope>()
.unwrap()
.close();
});

// Erase lifetime
let context = WebWorkerContext {
func: mem::transmute::<Box<dyn FnOnce() + Send + 'a>, Box<dyn FnOnce() + Send + 'static>>(main),
};

if is_web_worker_thread() {
WorkerMessage::SpawnThread(BuilderRequest { builder: self, context }).post();
} else {
self.spawn_for_context(context);
}
impl<T> Drop for MaybeDangling<T> {
fn drop(&mut self) {
// SAFETY: we are always initiailized.
unsafe { self.0.assume_init_drop() };
}

if let Some(scope) = &my_packet.scope {
scope.increment_num_running_threads();
}

Ok(JoinInner {
signal: my_signal,
packet: my_packet,
})
}

pub(crate) unsafe fn spawn_unchecked_async_<'a, F, T>(
self,
f: F,
scope_data: Option<Arc<ScopeData>>,
) -> std::io::Result<JoinInner<'static, T>>
where
F: AsyncClosure<T> + 'static,
T: Send + 'static,
{
let my_signal = Arc::new(Signal::new());
let their_signal = my_signal.clone();

let my_packet: Arc<Packet<'static, T>> = Arc::new(Packet {
scope: scope_data,
result: UnsafeCell::new(None),
_marker: PhantomData,
});
let their_packet = my_packet.clone();

let f = MaybeDangling::new(f);
let main = Box::new(move || {
let spawn_local = wasm_bindgen_futures::spawn_local(async move {
// SAFETY: we constructed `f` initialized.
let f = f.into_inner();
// Execute the closure and catch any panics
let try_result = catch_unwind(AssertUnwindSafe(|| f()));

let try_result = Ok(f.call_once().await);

// SAFETY: `their_packet` as been built just above and moved by the
// closure (it is an Arc<...>) and `my_packet` will be stored in the
// same `JoinInner` as this closure meaning the mutation will be
Expand All @@ -242,9 +334,16 @@ impl Builder {
drop(their_packet);
// Notify waiting handles
their_signal.signal();
// Here, the lifetime `'a` and even `'scope` can end. `main` keeps running for a bit
// Here, the lifetime `'a` and even `'scope` can end, so the thread can be closed. `main` keeps running for a bit
// after that before returning itself.
#[cfg(not(feature = "keep_worker_alive"))]
js_sys::eval("self")
.unwrap()
.dyn_into::<DedicatedWorkerGlobalScope>()
.unwrap()
.close();
});
let main = Box::new(move || spawn_local );

// Erase lifetime
let context = WebWorkerContext {
Expand Down Expand Up @@ -453,5 +552,30 @@ where
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
Builder::new().spawn(f).expect("Failed to spawn thread")
}

pub trait AsyncClosure<T> {
fn call_once(self) -> Pin<Box<dyn Future<Output = T>>>;
}

impl<F, Fut, T> AsyncClosure<T> for F
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
fn call_once(self) -> Pin<Box<dyn Future<Output = T>>> {
Box::pin(self())
}
}

// JoinHandle is of type () because the future immediately returns.
pub fn spawn_async<F, T>(f: F) -> JoinHandle<T>
where
F: AsyncClosure<T>,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn_async(f).expect("failed to spawn thread")
}
1 change: 1 addition & 0 deletions src/wasm32/scoped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use super::{signal::Signal, utils::is_web_worker_thread, Builder, JoinInner};


/// A scope to spawn scoped threads in.
///
/// See [`scope`] for details.
Expand Down
1 change: 1 addition & 0 deletions src/wasm32/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ impl<T> SpinLockMutex for Mutex<T> {
}
}
}

Loading