Skip to content

Commit 18d9ef4

Browse files
committed
feat: Make finalise_response, dispatch_to_resource and render_response functions async
1 parent 3107974 commit 18d9ef4

File tree

3 files changed

+55
-33
lines changed

3 files changed

+55
-33
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ anyhow = "1.0.98"
1515
bytes = "1.10.1"
1616
chrono = "0.4.41"
1717
futures = "0.3.31"
18+
futures-util = "0.3.31"
1819
hex = "0.4.3"
1920
http = "1.3.1"
2021
hyper = { version = "1.6.0", features = ["full"] }

src/lib.rs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,22 +42,26 @@ Each WebmachineResource defines all the callbacks (via Closures) and values requ
4242
Note: This example uses the maplit crate to provide the `btreemap` macro and the log crate for the logging macros.
4343
4444
```no_run
45+
use std::convert::Infallible;
46+
use std::future::ready;
47+
use std::io::Read;
48+
use std::net::SocketAddr;
49+
use std::sync::Arc;
50+
4551
use webmachine_rust::*;
4652
use webmachine_rust::context::*;
4753
use webmachine_rust::headers::*;
54+
4855
use bytes::Bytes;
49-
use serde_json::{Value, json};
50-
use std::io::Read;
51-
use std::net::SocketAddr;
52-
use std::convert::Infallible;
53-
use std::sync::Arc;
56+
use futures_util::future::FutureExt;
57+
use hyper::{body, Request};
58+
use hyper::server::conn::http1;
59+
use hyper::service::service_fn;
60+
use hyper_util::rt::TokioIo;
5461
use maplit::btreemap;
62+
use serde_json::{Value, json};
5563
use tracing::error;
56-
use hyper_util::rt::TokioIo;
5764
use tokio::net::TcpListener;
58-
use hyper::server::conn::http1;
59-
use hyper::service::service_fn;
60-
use hyper::{body, Request};
6165
6266
# fn main() {}
6367
@@ -71,12 +75,12 @@ Note: This example uses the maplit crate to provide the `btreemap` macro and the
7175
allowed_methods: owned_vec(&["OPTIONS", "GET", "HEAD", "POST"]),
7276
// if the resource exists callback
7377
resource_exists: callback(|_, _| true),
74-
// callback to render the response for the resource
75-
render_response: callback(|_, _| {
78+
// callback to render the response for the resource, it has to be async
79+
render_response: async_callback(|_, _| {
7680
let json_response = json!({
7781
"data": [1, 2, 3, 4]
7882
});
79-
Some(Bytes::from(json_response.to_string()))
83+
ready(Ok(Some(Bytes::from(json_response.to_string())))).boxed()
8084
}),
8185
// callback to process the post for the resource
8286
process_post: callback(|_, _| /* Handle the post here */ Ok(true) ),
@@ -114,9 +118,12 @@ For an example of a project using this crate, have a look at the [Pact Mock Serv
114118
#![warn(missing_docs)]
115119

116120
use std::collections::{BTreeMap, HashMap};
121+
use std::future::ready;
122+
use std::pin::Pin;
117123

118124
use bytes::Bytes;
119125
use chrono::{DateTime, FixedOffset, Utc};
126+
use futures_util::future::FutureExt;
120127
use http::{HeaderMap, Request, Response};
121128
use http_body_util::{BodyExt, Full};
122129
use hyper::body::Incoming;
@@ -141,6 +148,15 @@ pub fn callback<T, RT>(cb: T) -> WebmachineCallback<RT>
141148
Box::new(cb)
142149
}
143150

151+
/// Wrap an async callback in a structure that is safe to call between threads
152+
pub type AsyncWebmachineCallback<T> = Pin<Box<dyn Fn(&mut WebmachineContext, &WebmachineResource) -> Pin<Box<dyn Future<Output=T> + Send>> + Send + Sync>>;
153+
154+
/// Wrap a callback in a structure that is safe to call between threads
155+
pub fn async_callback<T, RT>(cb: T) -> Pin<Box<T>>
156+
where T: Fn(&mut WebmachineContext, &WebmachineResource) -> Pin<Box<dyn Future<Output=RT> + Send>> {
157+
Box::pin(cb)
158+
}
159+
144160
/// Convenience function to create a vector of string structs from a slice of strings
145161
pub fn owned_vec(strings: &[&str]) -> Vec<String> {
146162
strings.iter().map(|s| s.to_string()).collect()
@@ -152,7 +168,7 @@ pub struct WebmachineResource {
152168
/// an opportunity to modify the response after the webmachine has executed.
153169
pub finalise_response: Option<WebmachineCallback<()>>,
154170
/// This is invoked to render the response for the resource
155-
pub render_response: WebmachineCallback<Option<Bytes>>,
171+
pub render_response: AsyncWebmachineCallback<anyhow::Result<Option<Bytes>>>,
156172
/// Is the resource available? Returning false will result in a '503 Service Not Available'
157173
/// response. Defaults to true. If the resource is only temporarily not available,
158174
/// add a 'Retry-After' response header.
@@ -319,7 +335,7 @@ impl Default for WebmachineResource {
319335
multiple_choices: callback(false_fn),
320336
create_path: callback(|context, _| Ok(context.request.request_path.clone())),
321337
expires: callback(none_fn),
322-
render_response: callback(none_fn)
338+
render_response: async_callback(|_, _| ready(Ok(None)).boxed())
323339
}
324340
}
325341
}
@@ -973,7 +989,7 @@ async fn request_from_http_request(req: Request<Incoming>) -> WebmachineRequest
973989
}
974990
}
975991

976-
fn finalise_response(context: &mut WebmachineContext, resource: &WebmachineResource) {
992+
async fn finalise_response(context: &mut WebmachineContext, resource: &WebmachineResource) {
977993
if !context.response.has_header("Content-Type") {
978994
let media_type = match &context.selected_media_type {
979995
&Some(ref media_type) => media_type.clone(),
@@ -1039,9 +1055,14 @@ fn finalise_response(context: &mut WebmachineContext, resource: &WebmachineResou
10391055
}
10401056

10411057
if context.response.body.is_none() && context.response.status == 200 && context.request.is_get() {
1042-
match (resource.render_response)(context, resource) {
1043-
Some(body) => context.response.body = Some(body),
1044-
None => ()
1058+
match (resource.render_response)(context, resource).await {
1059+
Ok(Some(body)) => context.response.body = Some(body),
1060+
Ok(None) => (),
1061+
Err(err) => {
1062+
error!("render_response failed with an error: {}", err);
1063+
context.response.status = 500;
1064+
context.response.body = Some(Bytes::from(err.to_string()));
1065+
}
10451066
}
10461067
}
10471068

@@ -1079,7 +1100,7 @@ impl WebmachineDispatcher {
10791100
/// based on the request path. If one is not found, a 404 Not Found response is returned
10801101
pub async fn dispatch(&self, req: Request<Incoming>) -> http::Result<Response<Full<Bytes>>> {
10811102
let mut context = self.context_from_http_request(req).await;
1082-
self.dispatch_to_resource(&mut context);
1103+
self.dispatch_to_resource(&mut context).await;
10831104
generate_http_response(&context)
10841105
}
10851106

@@ -1107,7 +1128,7 @@ impl WebmachineDispatcher {
11071128

11081129
/// Dispatches to the matching webmachine resource. If there is no matching resource, returns
11091130
/// 404 Not Found response
1110-
pub fn dispatch_to_resource(&self, context: &mut WebmachineContext) {
1131+
pub async fn dispatch_to_resource(&self, context: &mut WebmachineContext) {
11111132
let matching_paths = self.match_paths(&context.request);
11121133
let ordered_by_length: Vec<String> = matching_paths.iter()
11131134
.cloned()
@@ -1117,7 +1138,7 @@ impl WebmachineDispatcher {
11171138
update_paths_for_resource(&mut context.request, path);
11181139
if let Some(resource) = self.lookup_resource(path) {
11191140
execute_state_machine(context, &resource);
1120-
finalise_response(context, &resource);
1141+
finalise_response(context, &resource).await;
11211142
} else {
11221143
context.response.status = 404;
11231144
}

src/tests.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ fn sanitise_path_test() {
5555
expect!(sanitise_path(&"/a//b/c".to_string())).to(be_equal_to(vec!["a", "b", "c"]));
5656
}
5757

58-
#[test]
59-
fn dispatcher_returns_404_if_there_is_no_matching_resource() {
58+
#[tokio::test]
59+
async fn dispatcher_returns_404_if_there_is_no_matching_resource() {
6060
let mut context = WebmachineContext::default();
6161
let displatcher = WebmachineDispatcher {
6262
routes: btreemap! { "/some/path" => WebmachineResource::default() }
6363
};
64-
displatcher.dispatch_to_resource(&mut context);
64+
displatcher.dispatch_to_resource(&mut context).await;
6565
expect(context.response.status).to(be_equal_to(404));
6666
}
6767

@@ -338,8 +338,8 @@ fn execute_state_machine_returns_406_if_the_request_does_not_have_an_acceptable_
338338
expect(context.response.status).to(be_equal_to(406));
339339
}
340340

341-
#[test]
342-
fn execute_state_machine_sets_content_type_header_if_the_request_does_have_an_acceptable_content_type() {
341+
#[tokio::test]
342+
async fn execute_state_machine_sets_content_type_header_if_the_request_does_have_an_acceptable_content_type() {
343343
let mut context = WebmachineContext {
344344
request: WebmachineRequest {
345345
headers: hashmap! {
@@ -354,7 +354,7 @@ fn execute_state_machine_sets_content_type_header_if_the_request_does_have_an_ac
354354
..WebmachineResource::default()
355355
};
356356
execute_state_machine(&mut context, &resource);
357-
finalise_response(&mut context, &resource);
357+
finalise_response(&mut context, &resource).await;
358358
expect(context.response.status).to(be_equal_to(200));
359359
expect(context.response.headers.get("Content-Type").unwrap()).to(be_equal_to(&vec![h!("application/xml;charset=ISO-8859-1")]));
360360
}
@@ -417,8 +417,8 @@ fn execute_state_machine_returns_406_if_the_request_does_not_have_an_acceptable_
417417
expect(context.response.status).to(be_equal_to(406));
418418
}
419419

420-
#[test]
421-
fn execute_state_machine_sets_the_charset_if_the_request_does_have_an_acceptable_charset() {
420+
#[tokio::test]
421+
async fn execute_state_machine_sets_the_charset_if_the_request_does_have_an_acceptable_charset() {
422422
let mut context = WebmachineContext {
423423
request: WebmachineRequest {
424424
headers: hashmap! {
@@ -433,7 +433,7 @@ fn execute_state_machine_sets_the_charset_if_the_request_does_have_an_acceptable
433433
..WebmachineResource::default()
434434
};
435435
execute_state_machine(&mut context, &resource);
436-
finalise_response(&mut context, &resource);
436+
finalise_response(&mut context, &resource).await;
437437
expect(context.response.status).to(be_equal_to(200));
438438
expect(context.response.headers.get("Content-Type").unwrap()).to(be_equal_to(&vec![h!("application/json;charset=UTF-8")]));
439439
}
@@ -457,8 +457,8 @@ fn execute_state_machine_returns_406_if_the_request_does_not_have_an_acceptable_
457457
expect(context.response.status).to(be_equal_to(406));
458458
}
459459

460-
#[test]
461-
fn execute_state_machine_sets_the_vary_header_if_the_resource_has_variances() {
460+
#[tokio::test]
461+
async fn execute_state_machine_sets_the_vary_header_if_the_resource_has_variances() {
462462
let mut context = WebmachineContext {
463463
request: WebmachineRequest {
464464
..WebmachineRequest::default()
@@ -470,7 +470,7 @@ fn execute_state_machine_sets_the_vary_header_if_the_resource_has_variances() {
470470
..WebmachineResource::default()
471471
};
472472
execute_state_machine(&mut context, &resource);
473-
finalise_response(&mut context, &resource);
473+
finalise_response(&mut context, &resource).await;
474474
expect(context.response.status).to(be_equal_to(200));
475475
expect(context.response.headers).to(be_equal_to(btreemap! {
476476
"Content-Type".to_string() => vec![h!("application/json;charset=ISO-8859-1")],

0 commit comments

Comments
 (0)