From acb0041514d67ea0680882ee6c8505d6119de4b8 Mon Sep 17 00:00:00 2001 From: Anthony Ramine Date: Fri, 19 Nov 2021 11:58:30 +0100 Subject: [PATCH 1/3] Implement server::Builder::max_send_buffer_size --- src/proto/connection.rs | 4 ++++ src/proto/streams/prioritize.rs | 42 ++++++++++++++++++++++++++++++--- src/proto/streams/send.rs | 4 ++++ src/proto/streams/streams.rs | 9 +++++++ src/server.rs | 41 ++++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+), 3 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index a75df4369..114bef968 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -140,6 +140,10 @@ where self.inner.streams.set_target_connection_window_size(size); } + pub(crate) fn set_max_send_buffer_size(&mut self, max: usize) { + self.inner.streams.set_max_send_buffer_size(max); + } + /// Send a new SETTINGS frame with an updated initial window size. pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> { let mut settings = frame::Settings::default(); diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index eaaee162b..0163b2284 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -51,6 +51,12 @@ pub(super) struct Prioritize { /// What `DATA` frame is currently being sent in the codec. in_flight_data_frame: InFlightData, + + /// The max send buffer size allowed. + max_send_buffer_size: usize, + + /// The current send buffer size. + current_send_buffer_size: usize, } #[derive(Debug, Eq, PartialEq)] @@ -93,9 +99,17 @@ impl Prioritize { flow, last_opened_id: StreamId::ZERO, in_flight_data_frame: InFlightData::Nothing, + max_send_buffer_size: usize::MAX, + current_send_buffer_size: 0, } } + pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) { + self.max_send_buffer_size = max; + + self.assign_connection_capacity(0, store, counts); + } + /// Queue a frame to be sent to the remote pub fn queue_frame( &mut self, @@ -175,6 +189,8 @@ impl Prioritize { self.try_assign_capacity(stream); } + self.current_send_buffer_size += sz as usize; + if frame.is_end_stream() { stream.state.send_close(); self.reserve_capacity(0, stream, counts); @@ -350,7 +366,7 @@ impl Prioritize { self.flow.assign_capacity(inc); // Assign newly acquired capacity to streams pending capacity. - while self.flow.available() > 0 { + while self.available() > 0 { let stream = match self.pending_capacity.pop(store) { Some(stream) => stream, None => return, @@ -373,6 +389,17 @@ impl Prioritize { } } + fn available(&self) -> WindowSize { + cmp::min( + self.flow.available().as_size() as usize, + cmp::min( + self.max_send_buffer_size + .saturating_sub(self.current_send_buffer_size), + WindowSize::MAX as usize, + ), + ) as WindowSize + } + /// Request capacity to send data fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { let total_requested = stream.requested_send_capacity; @@ -395,7 +422,8 @@ impl Prioritize { additional, buffered = stream.buffered_send_data, window = stream.send_flow.window_size(), - conn = %self.flow.available() + conn_window = %self.flow.available(), + conn = self.available(), ); if additional == 0 { @@ -413,7 +441,7 @@ impl Prioritize { ); // The amount of currently available capacity on the connection - let conn_available = self.flow.available().as_size(); + let conn_available = self.available(); // First check if capacity is immediately available if conn_available > 0 { @@ -509,6 +537,10 @@ impl Prioritize { // Because, always try to reclaim... self.reclaim_frame(buffer, store, dst); + + // Maybe schedule streams if the send buffer is not full + // anymore. + self.assign_connection_capacity(0, store, counts); } None => { // Try to flush the codec. @@ -630,6 +662,8 @@ impl Prioritize { tracing::trace!(?frame, "dropping"); } + self.current_send_buffer_size -= stream.buffered_send_data; + stream.buffered_send_data = 0; stream.requested_send_capacity = 0; if let InFlightData::DataFrame(key) = self.in_flight_data_frame { @@ -736,6 +770,8 @@ impl Prioritize { tracing::trace_span!("updating stream flow").in_scope(|| { stream.send_flow.send_data(len); + self.current_send_buffer_size -= len as usize; + // Decrement the stream's buffered data counter debug_assert!(stream.buffered_send_data >= len as usize); stream.buffered_send_data -= len as usize; diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 3735d13dd..e27c91d84 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -118,6 +118,10 @@ impl Send { Ok(()) } + pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) { + self.prioritize.set_max_send_buffer_size(max, store, counts); + } + pub fn send_headers( &mut self, frame: frame::Headers, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 4962db8d2..3d6f014bd 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -127,6 +127,15 @@ where .set_target_connection_window(size, &mut me.actions.task) } + pub fn set_max_send_buffer_size(&mut self, max: usize) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions + .send + .set_max_send_buffer_size(max, &mut me.store, &mut me.counts); + } + pub fn next_incoming(&mut self) -> Option> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; diff --git a/src/server.rs b/src/server.rs index 491446460..8f1640f43 100644 --- a/src/server.rs +++ b/src/server.rs @@ -245,6 +245,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + + /// Max send buffer size. + max_send_buffer_size: Option, } /// Send a response back to the client @@ -451,6 +454,11 @@ where self.connection.set_target_window_size(size); } + /// Sets the max send buffer size for the whole connection. + pub fn set_max_send_buffer_size(&mut self, max: usize) { + self.connection.set_max_send_buffer_size(max); + } + /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level /// flow control for received data. /// @@ -620,6 +628,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, + max_send_buffer_size: None, } } @@ -763,6 +772,35 @@ impl Builder { self } + /// Sets the max size of the send buffer. + /// + /// This setting is also used to limit the maximum amount of data + /// buffered to be sent. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc(my_io: T) + /// # -> Handshake + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_send_buffer_size(16 * 1024) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + self.max_send_buffer_size = Some(max); + self + } + /// Sets the maximum number of concurrent streams. /// /// The maximum concurrent streams setting only controls the maximum number @@ -1280,6 +1318,9 @@ where if let Some(sz) = self.builder.initial_target_connection_window_size { c.set_target_window_size(sz); } + if let Some(sz) = self.builder.max_send_buffer_size { + c.set_max_send_buffer_size(sz); + } Ok(c) }) } From 764d39fb3add5c6526997b7eed9972abd50aaacf Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 29 Nov 2021 17:35:20 -0800 Subject: [PATCH 2/3] add unit test for max_send_buffer_size waking tasks --- tests/h2-tests/tests/flow_control.rs | 54 ++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index be04a61b7..3624dc4fa 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1561,3 +1561,57 @@ async fn data_padding() { join(srv, h2).await; } + + +#[tokio::test] +async fn notify_on_send_buffer_available() { + // This test ensures that the stream gets notified when there is additional + // send buffer space. + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + client.send_frame( + frames::headers(1) + .request("GET", "https://www.example.com/") + .eos() + ) + .await; + client.recv_frame(frames::headers(1).response(200)).await; + client.recv_frame(frames::data(1, &b"abcde"[..])).await; + client.recv_frame(frames::data(1, &b"abcde"[..])).await; + client.recv_frame(frames::data(1, &b"abcde"[..])).await; + client.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let srv = async move { + let mut srv = server::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let (_req, mut reply) = srv.next().await.unwrap().unwrap(); + tokio::spawn(async move { + let rsp = http::Response::new(()); + let mut stream = reply.send_response(rsp, false).unwrap(); + + for _ in 0..3 { + stream.reserve_capacity(5); + stream = util::wait_for_capacity(stream, 5).await; + stream.send_data("abcde".into(), false).unwrap(); + } + + stream.send_data("".into(), true).unwrap(); + }).await.expect("srv body spawn"); + + // keep connection open till client is done + let _ = srv.next().await; + }; + + join(srv, client).await; +} From 2b00e4dcabb50d3e8bd63bd63bd3cf05d23f1af7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 30 Nov 2021 15:05:12 -0800 Subject: [PATCH 3/3] make test actually right --- tests/h2-tests/tests/flow_control.rs | 63 ++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 3624dc4fa..d46709a8b 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1581,11 +1581,24 @@ async fn notify_on_send_buffer_available() { .eos() ) .await; + client.send_frame( + frames::headers(3) + .request("GET", "https://www.example.com/") + .eos() + ) + .await; client.recv_frame(frames::headers(1).response(200)).await; - client.recv_frame(frames::data(1, &b"abcde"[..])).await; - client.recv_frame(frames::data(1, &b"abcde"[..])).await; - client.recv_frame(frames::data(1, &b"abcde"[..])).await; - client.recv_frame(frames::data(1, &b""[..]).eos()).await; + client.recv_frame(frames::headers(3).response(200)).await; + dbg!(11); + client.recv_frame(frames::data(1, &b"abcde"[..]).eos()).await; + dbg!(31); + client.recv_frame(frames::data(3, &b"abcde"[..])).await; + dbg!(32); + client.recv_frame(frames::data(3, &b"abcde"[..])).await; + dbg!(33); + client.recv_frame(frames::data(3, &b"abcde"[..])).await; + dbg!(34); + client.recv_frame(frames::data(3, &b""[..]).eos()).await; }; let srv = async move { @@ -1595,22 +1608,38 @@ async fn notify_on_send_buffer_available() { .await .expect("handshake"); - let (_req, mut reply) = srv.next().await.unwrap().unwrap(); - tokio::spawn(async move { - let rsp = http::Response::new(()); - let mut stream = reply.send_response(rsp, false).unwrap(); - - for _ in 0..3 { - stream.reserve_capacity(5); - stream = util::wait_for_capacity(stream, 5).await; - stream.send_data("abcde".into(), false).unwrap(); + let (_req, mut reply1) = srv.next().await.unwrap().unwrap(); + let (_req, mut reply2) = srv.next().await.unwrap().unwrap(); + + let mut stream1 = reply1.send_response(http::Response::new(()), false).unwrap(); + let mut stream2 = reply2.send_response(http::Response::new(()), false).unwrap(); + drop((reply1, reply2)); + + let t0 = tokio::spawn(async move { + assert!(srv.next().await.is_none(), "unexpected request"); + }); + let t1 = tokio::spawn(async move { + eprintln!("[t1] RESERVE 1 cap"); + stream1.reserve_capacity(1); + stream1 = util::wait_for_capacity(stream1, 1).await; + eprintln!("[t1] got 1 cap"); + stream1.send_data("abcde".into(), true).unwrap(); + }); + let t2 = tokio::spawn(async move { + for n in 0..3 { + eprintln!("[t2] RESERVE 1 cap, loop {}", n); + stream2.reserve_capacity(1); + stream2 = util::wait_for_capacity(stream2, 1).await; + eprintln!("[t2] got 1 cap, loop {}", n); + stream2.send_data("abcde".into(), false).unwrap(); } - stream.send_data("".into(), true).unwrap(); - }).await.expect("srv body spawn"); + stream2.send_data("".into(), true).unwrap(); + }); - // keep connection open till client is done - let _ = srv.next().await; + t2.await.expect("srv body spawn"); + t1.await.expect("srv body spawn"); + t0.await.expect("srv end"); }; join(srv, client).await;