diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index b67f39fe..75358d47 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -186,13 +186,7 @@ impl Prioritize { // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity // cannot be assigned at the time it is called. - // - // Streams over the max concurrent count will still call `send_data` so we should be - // careful not to put it into `pending_capacity` as it will starve the connection - // capacity for other streams - if !stream.is_pending_open { - self.try_assign_capacity(stream); - } + self.try_assign_capacity(stream); } if frame.is_end_stream() { @@ -414,6 +408,12 @@ impl Prioritize { /// Request capacity to send data fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { + // Streams over the max concurrent count should not have capacity assign to avoid starving the connection + // capacity for open streams + if stream.is_pending_open { + return; + } + let total_requested = stream.requested_send_capacity; // Total requested should never go below actual assigned diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index a5f90192..997f2364 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -391,7 +391,7 @@ async fn stream_close_by_data_frame_releases_capacity() { // The capacity should be immediately available as nothing else is // happening on the stream. - assert_eq!(s1.capacity(), window_size); + let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await; let request = Request::builder() .method(Method::POST) @@ -414,7 +414,7 @@ async fn stream_close_by_data_frame_releases_capacity() { s1.send_data("".into(), true).unwrap(); // The capacity should be available - assert_eq!(s2.capacity(), 5); + let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await; // Send the frame s2.send_data("hello".into(), true).unwrap(); @@ -461,9 +461,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() { // This effectively reserves the entire connection window s1.reserve_capacity(window_size); - // The capacity should be immediately available as nothing else is - // happening on the stream. - assert_eq!(s1.capacity(), window_size); + let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await; let request = Request::builder() .method(Method::POST) @@ -486,7 +484,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() { s1.send_trailers(Default::default()).unwrap(); // The capacity should be available - assert_eq!(s2.capacity(), 5); + let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await; // Send the frame s2.send_data("hello".into(), true).unwrap(); @@ -919,10 +917,10 @@ async fn recv_no_init_window_then_receive_some_init_window() { let (response, mut stream) = client.send_request(request, false).unwrap(); - stream.reserve_capacity(11); + stream.reserve_capacity(10); - let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await; - assert_eq!(stream.capacity(), 11); + let mut stream = h2.drive(util::wait_for_capacity(stream, 10)).await; + assert_eq!(stream.capacity(), 10); stream.send_data("hello world".into(), true).unwrap(); @@ -1990,6 +1988,120 @@ async fn reclaim_reserved_capacity() { join(mock, h2).await; } +#[tokio::test] +async fn capacity_not_assigned_to_unopened_streams() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let mock = async move { + let mut settings = frame::Settings::default(); + settings.set_max_concurrent_streams(Some(1)); + let settings = srv.assert_client_handshake_with_settings(settings).await; + assert_default_settings!(settings); + + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.recv_frame(frames::data(1, "hello")).await; + srv.recv_frame(frames::data(1, "world").eos()).await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + + srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::window_update( + 0, + frame::DEFAULT_INITIAL_WINDOW_SIZE + 10, + )) + .await; + srv.recv_frame(frames::reset(3).cancel()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap(); + stream1.send_data("hello".into(), false).unwrap(); + let (_, mut stream2) = client.send_request(request.clone(), false).unwrap(); + stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); + stream1.send_data("world".into(), true).unwrap(); + h2.drive(response1).await.unwrap(); + let stream2 = h2 + .drive(util::wait_for_capacity( + stream2, + frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, + )) + .await; + drop(stream2); + h2.await.unwrap(); + }; + + join(mock, h2).await; +} + +#[tokio::test] +async fn new_initial_window_size_capacity_not_assigned_to_unopened_streams() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let mock = async move { + let mut settings = frame::Settings::default(); + settings.set_max_concurrent_streams(Some(1)); + settings.set_initial_window_size(Some(10)); + let settings = srv.assert_client_handshake_with_settings(settings).await; + assert_default_settings!(settings); + + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.recv_frame(frames::data(1, "hello")).await; + srv.send_frame(frames::settings().initial_window_size(frame::DEFAULT_INITIAL_WINDOW_SIZE)) + .await; + srv.recv_frame(frames::settings_ack()).await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, "world").eos()).await; + + srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::window_update( + 0, + frame::DEFAULT_INITIAL_WINDOW_SIZE + 10, + )) + .await; + srv.recv_frame(frames::reset(3).cancel()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap(); + stream1.send_data("hello".into(), false).unwrap(); + let (_, mut stream2) = client.send_request(request.clone(), false).unwrap(); + stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); + h2.drive(response1).await.unwrap(); + stream1.send_data("world".into(), true).unwrap(); + let stream2 = h2 + .drive(util::wait_for_capacity( + stream2, + frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, + )) + .await; + drop(stream2); + h2.await.unwrap(); + }; + + join(mock, h2).await; +} + // ==== abusive window updates ==== #[tokio::test] diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index dd4ed9fe..5ca03a74 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -52,7 +52,9 @@ async fn single_stream_send_large_body() { stream.reserve_capacity(payload.len()); // The capacity should be immediately allocated - assert_eq!(stream.capacity(), payload.len()); + let mut stream = h2 + .drive(util::wait_for_capacity(stream, payload.len())) + .await; // Send the data stream.send_data(payload.into(), true).unwrap(); @@ -108,7 +110,9 @@ async fn multiple_streams_with_payload_greater_than_default_window() { // The capacity should be immediately // allocated to default window size (smaller than payload) stream1.reserve_capacity(payload_clone.len()); - assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE); + let mut stream1 = conn + .drive(util::wait_for_capacity(stream1, DEFAULT_WINDOW_SIZE)) + .await; stream2.reserve_capacity(payload_clone.len()); assert_eq!(stream2.capacity(), 0); @@ -179,7 +183,9 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() { stream.reserve_capacity(payload.len()); // The capacity should be immediately allocated - assert_eq!(stream.capacity(), payload.len()); + let mut stream = h2 + .drive(util::wait_for_capacity(stream, payload.len())) + .await; // Send the data stream.send_data(payload.into(), true).unwrap(); @@ -296,13 +302,13 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, ]) + .write(frames::SETTINGS_ACK) + .read(frames::SETTINGS_ACK) .write(&[ // DATA 0, 64, 0, 0, 0, 0, 0, 0, 1, ]) .write(&payload[0..16_384]) - .write(frames::SETTINGS_ACK) - .read(frames::SETTINGS_ACK) .wait(Duration::from_millis(10)) .write(&[ // DATA @@ -326,7 +332,9 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { stream.reserve_capacity(payload.len()); // The capacity should be immediately allocated - assert_eq!(stream.capacity(), payload.len()); + let mut stream = h2 + .drive(util::wait_for_capacity(stream, payload.len())) + .await; // Send the data stream.send_data(payload.into(), true).unwrap(); diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 8ec2cf31..c01b6daf 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -51,11 +51,11 @@ async fn send_recv_data() { 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, ]) + .write(frames::SETTINGS_ACK) .write(&[ // DATA 0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111, ]) - .write(frames::SETTINGS_ACK) // Read response .read(&[ // HEADERS @@ -78,10 +78,10 @@ async fn send_recv_data() { // Reserve send capacity stream.reserve_capacity(5); - assert_eq!(stream.capacity(), 5); + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; // Send the data - stream.send_data("hello".as_bytes(), true).unwrap(); + stream.send_data("hello".into(), true).unwrap(); // Get the response let resp = h2.run(response).await.unwrap();