Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions host/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
//! Interfaces for HTTP interactions of the guest.

use std::{borrow::Cow, collections::HashSet, fmt};
use std::{borrow::Cow, collections::HashSet, fmt, sync::Arc};

use http::HeaderName;
pub use http::Method as HttpMethod;
use wasmtime_wasi_http::body::HyperOutgoingBody;
use wasmtime_wasi::ResourceTable;
use wasmtime_wasi_http::{
HttpResult, WasiHttpCtx, WasiHttpView,
bindings::http::types::ErrorCode as HttpErrorCode,
body::HyperOutgoingBody,
types::{
DEFAULT_FORBIDDEN_HEADERS, HostFutureIncomingResponse, OutgoingRequestConfig,
default_send_request_handler,
},
};

use crate::state::WasmStateImpl;

/// Validates if an outgoing HTTP interaction is allowed.
///
Expand Down Expand Up @@ -112,6 +124,60 @@ impl fmt::Display for HttpRequestRejected {

impl std::error::Error for HttpRequestRejected {}

impl WasiHttpView for WasmStateImpl {
fn ctx(&mut self) -> &mut WasiHttpCtx {
&mut self.wasi_http_ctx
}

fn table(&mut self) -> &mut ResourceTable {
&mut self.resource_table
}

fn send_request(
&mut self,
mut request: hyper::Request<HyperOutgoingBody>,
config: OutgoingRequestConfig,
) -> HttpResult<HostFutureIncomingResponse> {
let _guard = self.io_rt.enter();

// Python `requests` sends this so we allow it but later drop it from the actual request.
request.headers_mut().remove(hyper::header::CONNECTION);

// technically we could return an error straight away, but `urllib3` doesn't handle that super well, so we
// create a future and validate the error in there (before actually starting the request of course)

let validator = Arc::clone(&self.http_validator);
let handle = wasmtime_wasi::runtime::spawn(async move {
// yes, that's another layer of futures. The WASI interface is somewhat nested.
let fut = async {
validator
.validate(&request, config.use_tls)
.map_err(|_| HttpErrorCode::HttpRequestDenied)?;

log::debug!(
"UDF HTTP request: {} {}",
request.method().as_str(),
request.uri(),
);
default_send_request_handler(request, config).await
};

Ok(fut.await)
});

Ok(HostFutureIncomingResponse::pending(handle))
}

fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
// Python `requests` sends this so we allow it but later drop it from the actual request.
if name == hyper::header::CONNECTION {
return false;
}

DEFAULT_FORBIDDEN_HEADERS.contains(name)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
123 changes: 5 additions & 118 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//! [DataFusion]: https://datafusion.apache.org/
use std::{any::Any, collections::HashSet, hash::Hash, ops::DerefMut, sync::Arc, time::Duration};

use ::http::HeaderName;
use arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_execution::memory_pool::MemoryPool;
Expand All @@ -18,18 +17,8 @@ use wasmtime::{
Engine, Store, UpdateDeadline,
component::{Component, ResourceAny},
};
use wasmtime_wasi::{
ResourceTable, WasiCtx, WasiCtxView, WasiView, async_trait, p2::pipe::MemoryOutputPipe,
};
use wasmtime_wasi_http::{
HttpResult, WasiHttpCtx, WasiHttpView,
bindings::http::types::ErrorCode as HttpErrorCode,
body::HyperOutgoingBody,
types::{
DEFAULT_FORBIDDEN_HEADERS, HostFutureIncomingResponse, OutgoingRequestConfig,
default_send_request_handler,
},
};
use wasmtime_wasi::{ResourceTable, WasiCtx, async_trait, p2::pipe::MemoryOutputPipe};
use wasmtime_wasi_http::WasiHttpCtx;

use crate::{
bindings::exports::datafusion_udf_wasm::udf::types as wit_types,
Expand All @@ -38,8 +27,9 @@ use crate::{
ignore_debug::IgnoreDebug,
limiter::Limiter,
linker::link,
state::WasmStateImpl,
tokio_helpers::async_in_sync_context,
vfs::{VfsCtxView, VfsState, VfsView},
vfs::VfsState,
};

pub use crate::{
Expand Down Expand Up @@ -69,113 +59,10 @@ mod ignore_debug;
mod limiter;
mod linker;
mod permissions;
mod state;
mod tokio_helpers;
mod vfs;

/// State of the WASM payload.
#[derive(Debug)]
struct WasmStateImpl {
/// Virtual filesystem for the WASM payload.
///
/// This filesystem is provided to the payload in memory with read-write support.
vfs_state: VfsState,

/// Resource limiter.
limiter: Limiter,

/// A limited buffer for stderr.
///
/// This is especially useful for when the payload crashes.
stderr: MemoryOutputPipe,

/// WASI context.
wasi_ctx: IgnoreDebug<WasiCtx>,

/// WASI HTTP context.
wasi_http_ctx: WasiHttpCtx,

/// Resource tables.
resource_table: ResourceTable,

/// HTTP request validator.
http_validator: Arc<dyn HttpRequestValidator>,

/// Handle to tokio I/O runtime.
io_rt: Handle,
}

impl WasiView for WasmStateImpl {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi_ctx,
table: &mut self.resource_table,
}
}
}

impl WasiHttpView for WasmStateImpl {
fn ctx(&mut self) -> &mut WasiHttpCtx {
&mut self.wasi_http_ctx
}

fn table(&mut self) -> &mut ResourceTable {
&mut self.resource_table
}

fn send_request(
&mut self,
mut request: hyper::Request<HyperOutgoingBody>,
config: OutgoingRequestConfig,
) -> HttpResult<HostFutureIncomingResponse> {
let _guard = self.io_rt.enter();

// Python `requests` sends this so we allow it but later drop it from the actual request.
request.headers_mut().remove(hyper::header::CONNECTION);

// technically we could return an error straight away, but `urllib3` doesn't handle that super well, so we
// create a future and validate the error in there (before actually starting the request of course)

let validator = Arc::clone(&self.http_validator);
let handle = wasmtime_wasi::runtime::spawn(async move {
// yes, that's another layer of futures. The WASI interface is somewhat nested.
let fut = async {
validator
.validate(&request, config.use_tls)
.map_err(|_| HttpErrorCode::HttpRequestDenied)?;

log::debug!(
"UDF HTTP request: {} {}",
request.method().as_str(),
request.uri(),
);
default_send_request_handler(request, config).await
};

Ok(fut.await)
});

Ok(HostFutureIncomingResponse::pending(handle))
}

fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
// Python `requests` sends this so we allow it but later drop it from the actual request.
if name == hyper::header::CONNECTION {
return false;
}

DEFAULT_FORBIDDEN_HEADERS.contains(name)
}
}

impl VfsView for WasmStateImpl {
fn vfs(&mut self) -> VfsCtxView<'_> {
VfsCtxView {
table: &mut self.resource_table,
vfs_state: &mut self.vfs_state,
}
}
}

/// Create WASM engine.
fn create_engine() -> DataFusionResult<Engine> {
Engine::new(
Expand Down
2 changes: 1 addition & 1 deletion host/src/linker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use wasmtime::{
use wasmtime_wasi::{ResourceTable, WasiView};

use crate::{
WasmStateImpl,
bindings::Datafusion,
state::WasmStateImpl,
vfs::{HasFs, VfsView},
};

Expand Down
50 changes: 50 additions & 0 deletions host/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! State handling of guests.

use std::sync::Arc;

use tokio::runtime::Handle;
use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxView, WasiView, p2::pipe::MemoryOutputPipe};
use wasmtime_wasi_http::WasiHttpCtx;

use crate::{HttpRequestValidator, ignore_debug::IgnoreDebug, limiter::Limiter, vfs::VfsState};

/// State of the WASM payload.
#[derive(Debug)]
pub(crate) struct WasmStateImpl {
/// Virtual filesystem for the WASM payload.
///
/// This filesystem is provided to the payload in memory with read-write support.
pub(crate) vfs_state: VfsState,

/// Resource limiter.
pub(crate) limiter: Limiter,

/// A limited buffer for stderr.
///
/// This is especially useful for when the payload crashes.
pub(crate) stderr: MemoryOutputPipe,

/// WASI context.
pub(crate) wasi_ctx: IgnoreDebug<WasiCtx>,

/// WASI HTTP context.
pub(crate) wasi_http_ctx: WasiHttpCtx,

/// Resource tables.
pub(crate) resource_table: ResourceTable,

/// HTTP request validator.
pub(crate) http_validator: Arc<dyn HttpRequestValidator>,

/// Handle to tokio I/O runtime.
pub(crate) io_rt: Handle,
}

impl WasiView for WasmStateImpl {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi_ctx,
table: &mut self.resource_table,
}
}
}
10 changes: 10 additions & 0 deletions host/src/vfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use wasmtime_wasi::{
use crate::{
error::LimitExceeded,
limiter::Limiter,
state::WasmStateImpl,
vfs::{
limits::VfsLimits,
path::{PathSegment, PathTraversal},
Expand All @@ -49,6 +50,15 @@ use crate::{
pub(crate) mod limits;
mod path;

impl VfsView for WasmStateImpl {
fn vfs(&mut self) -> VfsCtxView<'_> {
VfsCtxView {
table: &mut self.resource_table,
vfs_state: &mut self.vfs_state,
}
}
}

/// Shared version of [`VfsNode`].
type SharedVfsNode = Arc<RwLock<VfsNode>>;

Expand Down