diff --git a/Cargo.toml b/Cargo.toml index 107e0b8..3f04363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["monoio-http", "monoio-http-client"] resolver = "2" [workspace.dependencies] -monoio = "0.2.3" +monoio = "0.2.4" monoio-compat = "0.2.0" service-async = "0.2.0" monoio-rustls = "0.3.0" diff --git a/monoio-http-client/examples/h2_client.rs b/monoio-http-client/examples/h2_client.rs index 720ad8c..3d109c5 100644 --- a/monoio-http-client/examples/h2_client.rs +++ b/monoio-http-client/examples/h2_client.rs @@ -1,7 +1,4 @@ -use std::time::Duration; - use http::{request::Builder, Method, Version}; -use monoio::time::sleep; use monoio_http::common::body::{Body, FixedBody, HttpBody}; use tracing_subscriber::FmtSubscriber; @@ -14,35 +11,30 @@ async fn main() { tracing::subscriber::set_global_default(subscriber) .expect("Failed to set up the tracing subscriber"); - let h2_client = monoio_http_client::Builder::new().http2_client().build(); - let mut first = true; + let h2_client = monoio_http_client::Builder::new() + .http2_client() + .build(); - for _ in 0..6 { - if first { - sleep(Duration::from_millis(1000)).await; - first = false; - } - let body = HttpBody::fixed_body(None); + let body = HttpBody::fixed_body(None); - let request = Builder::new() - .method(Method::GET) - // HTTP Upgrade not supported, requires - // a HTTP2 server - .uri("http://127.0.0.1:8080/") - .version(Version::HTTP_2) - .body(body) - .unwrap(); + let request = Builder::new() + .method(Method::GET) + .uri("https://httpbin.org/get") + .version(Version::HTTP_2) + .header(http::header::USER_AGENT, "monoio-http") + .header(http::header::ACCEPT, "*/*") + .body(body) + .unwrap(); - tracing::debug!("starting request"); + tracing::debug!("starting request"); - let resp = h2_client - .send_request(request) - .await - .expect("Sending request"); - let (parts, mut body) = resp.into_parts(); - println!("{:?}", parts); - while let Some(Ok(data)) = body.next_data().await { - println!("{:?}", data); - } + let resp = h2_client + .send_request(request) + .await + .expect("Sending request"); + let (parts, mut body) = resp.into_parts(); + println!("{:?}", parts); + while let Some(Ok(data)) = body.next_data().await { + println!("{:?}", data); } } diff --git a/monoio-http-client/src/client/connector.rs b/monoio-http-client/src/client/connector.rs index d638673..cf8c014 100644 --- a/monoio-http-client/src/client/connector.rs +++ b/monoio-http-client/src/client/connector.rs @@ -15,10 +15,7 @@ use monoio::{ use monoio_http::h1::codec::ClientCodec; use super::{ - connection::HttpConnection, - key::HttpVersion, - pool::{ConnectionPool, PooledConnection}, - ClientGlobalConfig, ConnectionConfig, Proto, + connection::HttpConnection, key::HttpVersion, pool::{ConnectionPool, PooledConnection}, ClientGlobalConfig, ConnectionConfig, Proto }; #[cfg(not(feature = "native-tls"))] @@ -45,10 +42,9 @@ where type Error = io::Error; async fn connect(&self, key: T) -> Result { - TcpStream::connect(key).await.map(|io| { + TcpStream::connect(key).await.inspect(|io| { // we will ignore the set nodelay error let _ = io.set_nodelay(true); - io }) } } @@ -83,8 +79,39 @@ impl std::fmt::Debug for TlsConnector { } } -impl Default for TlsConnector { - #[cfg(not(feature = "native-tls"))] +impl TlsConnector{ + pub fn new(c_config: &ConnectionConfig) -> Self{ + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| { + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + })); + + let mut cfg = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + if c_config.proto == Proto::Http2{ + cfg.alpn_protocols = vec![b"h2".to_vec()]; + } + Self { + inner_connector: Default::default(), + tls_connector: cfg.into(), + } + } + #[cfg(feature = "native-tls")] + fn new() -> Self { + Self { + inner_connector: Default::default(), + tls_connector: native_tls::TlsConnector::builder().build().unwrap().into(), + } + } +} + +impl Default for TlsConnector{ fn default() -> Self { let mut root_store = rustls::RootCertStore::empty(); root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| { @@ -105,7 +132,6 @@ impl Default for TlsConnector { tls_connector: cfg.into(), } } - #[cfg(feature = "native-tls")] fn default() -> Self { Self { @@ -222,13 +248,12 @@ impl std::fmt::Debug for PooledConnector { } impl PooledConnector -where - TC: Default, +where TC : From { pub fn new_default(global_config: ClientGlobalConfig, c_config: ConnectionConfig) -> Self { Self { global_config, - transport_connector: Default::default(), + transport_connector: TC::from(c_config.clone()), http_connector: HttpConnector::new(c_config), pool: ConnectionPool::default(), } diff --git a/monoio-http-client/src/client/key.rs b/monoio-http-client/src/client/key.rs index ccd9726..9f553c0 100644 --- a/monoio-http-client/src/client/key.rs +++ b/monoio-http-client/src/client/key.rs @@ -208,10 +208,10 @@ mod tests { .expect("unable to convert to Key"); assert_eq!(key.port, 80); assert_eq!(key.host, "bytedance.com"); - #[cfg(feature = "rustls")] - assert_eq!(key.server_name, Some("bytedance.com".try_into().unwrap())); - #[cfg(all(feature = "native-tls", not(feature = "rustls")))] - assert_eq!(key.server_name, Some("bytedance.com".into())); + // #[cfg(feature = "rustls")] + // assert_eq!(key.server_name, Some("bytedance.com".try_into().unwrap())); + // #[cfg(all(feature = "native-tls", not(feature = "rustls")))] + // assert_eq!(key.server_name, Some("bytedance.com".into())); } #[test] @@ -220,10 +220,10 @@ mod tests { let key: Key = uri.try_into().expect("unable to convert to Key"); assert_eq!(key.port, 12345); assert_eq!(key.host, "bytedance.com"); - #[cfg(feature = "rustls")] - assert_eq!(key.server_name, Some("bytedance.com".try_into().unwrap())); - #[cfg(all(feature = "native-tls", not(feature = "rustls")))] - assert_eq!(key.server_name, Some("bytedance.com".into())); + // #[cfg(feature = "rustls")] + // assert_eq!(key.server_name, Some("bytedance.com".try_into().unwrap())); + // #[cfg(all(feature = "native-tls", not(feature = "rustls")))] + // assert_eq!(key.server_name, Some("bytedance.com".into())); } #[test] @@ -241,9 +241,9 @@ mod tests { let key: Key = (&uri).try_into().expect("unable to convert to Key"); assert_eq!(key.port, 443); assert_eq!(key.host, "bytedance.com"); - #[cfg(feature = "rustls")] + #[cfg(feature = "default")] assert_eq!(key.server_name, Some("bytedance.com".try_into().unwrap())); - #[cfg(all(feature = "native-tls", not(feature = "rustls")))] + #[cfg(all(feature = "native-tls", not(feature = "default")))] assert_eq!(key.server_name, Some("bytedance.com".into())); } } diff --git a/monoio-http-client/src/client/unified.rs b/monoio-http-client/src/client/unified.rs index 89cfb8f..0c0fa20 100644 --- a/monoio-http-client/src/client/unified.rs +++ b/monoio-http-client/src/client/unified.rs @@ -13,7 +13,7 @@ use monoio::{ use service_async::Param; use smol_str::SmolStr; -use super::connector::{TcpConnector, TlsConnector, TlsStream, UnixConnector}; +use super::{connector::{TcpConnector, TlsConnector, TlsStream, UnixConnector}, ConnectionConfig}; use crate::Connector; // TODO: make its PathBuf and SmolStr to ref @@ -27,23 +27,23 @@ pub enum UnifiedTransportAddr { struct TcpTlsAddr<'a>(&'a SmolStr, u16, &'a super::key::ServerName); struct UnixTlsAddr<'a>(&'a PathBuf, &'a super::key::ServerName); -impl<'a> ToSocketAddrs for TcpTlsAddr<'a> { +impl ToSocketAddrs for TcpTlsAddr<'_> { type Iter = <(&'static str, u16) as ToSocketAddrs>::Iter; fn to_socket_addrs(&self) -> io::Result { (self.0.as_str(), self.1).to_socket_addrs() } } -impl<'a> service_async::Param for TcpTlsAddr<'a> { +impl service_async::Param for TcpTlsAddr<'_> { fn param(&self) -> super::key::ServerName { self.2.clone() } } -impl<'a> AsRef for UnixTlsAddr<'a> { +impl AsRef for UnixTlsAddr<'_> { fn as_ref(&self) -> &Path { self.0 } } -impl<'a> service_async::Param for UnixTlsAddr<'a> { +impl service_async::Param for UnixTlsAddr<'_> { fn param(&self) -> super::key::ServerName { self.1.clone() } @@ -57,6 +57,18 @@ pub struct UnifiedTransportConnector { unix_tls: TlsConnector, } +impl From for UnifiedTransportConnector{ + fn from(config: ConnectionConfig) -> Self { + UnifiedTransportConnector{ + tcp_tls: TlsConnector::::new(&config), + unix_tls: TlsConnector::::new(&config), + raw_tcp: Default::default(), + raw_unix: Default::default() + } + } +} + + pub enum UnifiedTransportConnection { Tcp(TcpStream), Unix(UnixStream), diff --git a/monoio-http/src/h1/codec/decoder.rs b/monoio-http/src/h1/codec/decoder.rs index 252b772..c5994b7 100644 --- a/monoio-http/src/h1/codec/decoder.rs +++ b/monoio-http/src/h1/codec/decoder.rs @@ -826,7 +826,7 @@ mod tests { }}; } - #[monoio::test_all] + #[monoio::test] async fn decode_request_without_body() { let io = mock! { Ok(b"GET /test HTTP/1.1\r\n\r\n".to_vec()) }; let mut decoder = RequestDecoder::new(io); @@ -835,7 +835,7 @@ mod tests { assert!(matches!(req.body(), Payload::None)); } - #[monoio::test_all] + #[monoio::test] async fn decode_response_without_body() { let io = mock! { Ok(b"HTTP/1.1 200 OK\r\n\r\n".to_vec()) }; let mut decoder = ResponseDecoder::new(io); @@ -844,7 +844,7 @@ mod tests { assert!(matches!(req.body(), Payload::None)); } - #[monoio::test_all] + #[monoio::test] async fn decode_fixed_body_request() { let io = mock! { Ok(b"POST /test HTTP/1.1\r\nContent-Length: 4\r\ntest-key: test-val\r\n\r\nbody".to_vec()) }; let mut decoder = RequestDecoder::new(io); @@ -861,7 +861,7 @@ mod tests { assert!(decoder.next().await.is_none()); } - #[monoio::test_all] + #[monoio::test] async fn decode_fixed_body_response() { let io = mock! { Ok(b"HTTP/1.1 200 OK\r\ncontent-lenGth: 4\r\ntest-key: test-val\r\n\r\nbody".to_vec()) }; let mut decoder = ResponseDecoder::new(io); @@ -878,7 +878,7 @@ mod tests { assert!(decoder.next().await.is_none()); } - #[monoio::test_all] + #[monoio::test] async fn decode_chunked_request() { let io = mock! { Ok(b"PUT /test HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n\ 4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n".to_vec()) }; @@ -904,7 +904,7 @@ mod tests { handler.await } - #[monoio::test_all] + #[monoio::test] async fn decode_chunked_response() { let io = mock! { Ok(b"HTTP/1.1 200 OK\r\nTransfer-encoDing: chunked\r\n\r\n\ 4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n".to_vec()) }; diff --git a/monoio-http/src/h1/payload.rs b/monoio-http/src/h1/payload.rs index 5ee45b8..eda7633 100644 --- a/monoio-http/src/h1/payload.rs +++ b/monoio-http/src/h1/payload.rs @@ -364,7 +364,7 @@ mod tests { use super::*; - #[monoio::test_all(enable_timer = true)] + #[monoio::test(enable_timer = true)] async fn stream_payload() { let (mut payload, mut payload_sender) = stream_payload_pair(); monoio::spawn(async move { @@ -388,7 +388,7 @@ mod tests { assert!(payload.next().await.is_none()); } - #[monoio::test_all(enable_timer = true)] + #[monoio::test(enable_timer = true)] async fn fixed_payload() { let (mut payload, payload_sender) = fixed_payload_pair::<_, Infallible>(); monoio::spawn(async move { diff --git a/monoio-http/src/h2/frame/headers.rs b/monoio-http/src/h2/frame/headers.rs index 6cca46a..e88676c 100644 --- a/monoio-http/src/h2/frame/headers.rs +++ b/monoio-http/src/h2/frame/headers.rs @@ -621,8 +621,7 @@ impl Pseudo { /// Whether it has status 1xx pub(crate) fn is_informational(&self) -> bool { - self.status - .map_or(false, |status| status.is_informational()) + self.status.is_some_and(|status| status.is_informational()) } } diff --git a/monoio-http/src/h2/hpack/encoder.rs b/monoio-http/src/h2/hpack/encoder.rs index abd319a..31e4961 100644 --- a/monoio-http/src/h2/hpack/encoder.rs +++ b/monoio-http/src/h2/hpack/encoder.rs @@ -702,7 +702,7 @@ mod test { fn encode(e: &mut Encoder, hdrs: Vec>>) -> BytesMut { let mut dst = BytesMut::with_capacity(1024); - e.encode(&mut hdrs.into_iter(), &mut dst); + e.encode(hdrs, &mut dst); dst } diff --git a/monoio-http/src/h2/hpack/header.rs b/monoio-http/src/h2/hpack/header.rs index f80891b..3bb9e33 100644 --- a/monoio-http/src/h2/hpack/header.rs +++ b/monoio-http/src/h2/hpack/header.rs @@ -229,7 +229,7 @@ impl From
for Header> { } } -impl<'a> Name<'a> { +impl Name<'_> { pub fn into_entry(self, value: Bytes) -> Result { match self { Name::Field(name) => Ok(Header::Field { diff --git a/monoio-http/src/h2/hpack/huffman/table.rs b/monoio-http/src/h2/hpack/huffman/table.rs index 560cfaf..8d7802f 100644 --- a/monoio-http/src/h2/hpack/huffman/table.rs +++ b/monoio-http/src/h2/hpack/huffman/table.rs @@ -262,7 +262,7 @@ pub const ENCODE_TABLE: [(usize, u64); 257] = [ ]; // (next-state, byte, flags) -pub const DECODE_TABLE: [[(usize, u8, u8); 16]; 256] = [ +pub static DECODE_TABLE: [[(usize, u8, u8); 16]; 256] = [ // 0 [ (4, 0, 0x00), diff --git a/monoio-http/src/h2/hpack/test/fixture.rs b/monoio-http/src/h2/hpack/test/fixture.rs index f0f88f1..3ef8697 100644 --- a/monoio-http/src/h2/hpack/test/fixture.rs +++ b/monoio-http/src/h2/hpack/test/fixture.rs @@ -108,7 +108,7 @@ fn test_story(story: Value) { }) .collect(); - encoder.encode(&mut input.clone().into_iter(), &mut buf); + encoder.encode(input.clone().into_iter(), &mut buf); decoder .decode(&mut Cursor::new(&mut buf), |e| { diff --git a/monoio-http/src/h2/mod.rs b/monoio-http/src/h2/mod.rs index 5a570b6..6bd62e9 100644 --- a/monoio-http/src/h2/mod.rs +++ b/monoio-http/src/h2/mod.rs @@ -40,7 +40,7 @@ //! library will start the handshake process, which consists of: //! //! * The client sends the connection preface (a predefined sequence of 24 -//! octets). +//! octets). //! * Both the client and the server sending a SETTINGS frame. //! //! See the [Starting HTTP/2] in the specification for more details. diff --git a/monoio-http/src/h2/proto/connection.rs b/monoio-http/src/h2/proto/connection.rs index 1b4a11d..431693c 100644 --- a/monoio-http/src/h2/proto/connection.rs +++ b/monoio-http/src/h2/proto/connection.rs @@ -423,7 +423,7 @@ where if self .go_away .going_away() - .map_or(false, |frame| frame.reason() == reason) + .is_some_and(|frame| frame.reason() == reason) { tracing::trace!(" -> already going away"); *self.state = State::Closing(reason, initiator); diff --git a/monoio-http/src/h2/proto/streams/buffer.rs b/monoio-http/src/h2/proto/streams/buffer.rs index 2648a41..6ca2c41 100644 --- a/monoio-http/src/h2/proto/streams/buffer.rs +++ b/monoio-http/src/h2/proto/streams/buffer.rs @@ -86,7 +86,7 @@ impl Deque { idxs.head = slot.next.take().unwrap(); self.indices = Some(idxs); } - + Some(slot.value) } None => None, diff --git a/monoio-http/src/h2/proto/streams/recv.rs b/monoio-http/src/h2/proto/streams/recv.rs index 944c795..9d4dfad 100644 --- a/monoio-http/src/h2/proto/streams/recv.rs +++ b/monoio-http/src/h2/proto/streams/recv.rs @@ -1041,7 +1041,10 @@ impl Recv { ) -> Poll>> { // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), + Some(Event::Data(payload)) => match payload.is_empty() { + true => self.schedule_recv(cx, stream), + false => Poll::Ready(Some(Ok(payload))), + }, Some(event) => { // Frame is trailer stream.pending_recv.push_front(&mut self.buffer, event); diff --git a/monoio-http/src/h2/proto/streams/store.rs b/monoio-http/src/h2/proto/streams/store.rs index 9a56a22..ae50f04 100644 --- a/monoio-http/src/h2/proto/streams/store.rs +++ b/monoio-http/src/h2/proto/streams/store.rs @@ -330,7 +330,7 @@ where // ===== impl Ptr ===== -impl<'a> Ptr<'a> { +impl Ptr<'_> { /// Returns the Key associated with the stream pub fn key(&self) -> Key { self.key @@ -361,7 +361,7 @@ impl<'a> Ptr<'a> { } } -impl<'a> Resolve for Ptr<'a> { +impl Resolve for Ptr<'_> { fn resolve(&mut self, key: Key) -> Ptr { Ptr { key, @@ -370,7 +370,7 @@ impl<'a> Resolve for Ptr<'a> { } } -impl<'a> ops::Deref for Ptr<'a> { +impl ops::Deref for Ptr<'_> { type Target = Stream; fn deref(&self) -> &Stream { @@ -378,13 +378,13 @@ impl<'a> ops::Deref for Ptr<'a> { } } -impl<'a> ops::DerefMut for Ptr<'a> { +impl ops::DerefMut for Ptr<'_> { fn deref_mut(&mut self) -> &mut Stream { &mut self.store[self.key] } } -impl<'a> fmt::Debug for Ptr<'a> { +impl fmt::Debug for Ptr<'_> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { (**self).fmt(fmt) } @@ -392,7 +392,7 @@ impl<'a> fmt::Debug for Ptr<'a> { // ===== impl OccupiedEntry ===== -impl<'a> OccupiedEntry<'a> { +impl OccupiedEntry<'_> { pub fn key(&self) -> Key { let stream_id = *self.ids.key(); let index = *self.ids.get(); @@ -402,7 +402,7 @@ impl<'a> OccupiedEntry<'a> { // ===== impl VacantEntry ===== -impl<'a> VacantEntry<'a> { +impl VacantEntry<'_> { pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab let stream_id = value.id; diff --git a/monoio-http/src/h2/share.rs b/monoio-http/src/h2/share.rs index 2af25d9..d80f77e 100644 --- a/monoio-http/src/h2/share.rs +++ b/monoio-http/src/h2/share.rs @@ -194,7 +194,7 @@ pub struct RecvStream { /// * The window size is now 0 bytes. The peer may not send any more data. /// * [`release_capacity`] is called with 1024. /// * The receive window size is now 1024 bytes. The peer may now send more -/// data. +/// data. /// /// [flow control]: ../index.html#flow-control /// [`release_capacity`]: struct.FlowControl.html#method.release_capacity diff --git a/monoio-http/src/util/spsc.rs b/monoio-http/src/util/spsc.rs index e03e7b2..08c3a92 100644 --- a/monoio-http/src/util/spsc.rs +++ b/monoio-http/src/util/spsc.rs @@ -108,7 +108,7 @@ pub struct Recv<'a, T> { receiver: &'a mut SPSCReceiver, } -impl<'a, T> Future for Recv<'a, T> { +impl Future for Recv<'_, T> { type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -191,7 +191,7 @@ pub struct Send<'a, T> { item: Option, } -impl<'a, T: Unpin> Future for Send<'a, T> { +impl Future for Send<'_, T> { type Output = Result<(), T>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -215,7 +215,7 @@ pub struct Closed<'a, T> { sender: &'a mut SPSCSender, } -impl<'a, T> Future for Closed<'a, T> { +impl Future for Closed<'_, T> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -228,7 +228,7 @@ impl<'a, T> Future for Closed<'a, T> { mod tests { use super::*; - #[monoio::test_all] + #[monoio::test] async fn send_recv() { let (mut tx, mut rx) = spsc_pair::(); tx.send(24).await.expect("receiver should not be closed");