Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ where
self.inner.streams.set_target_connection_window_size(size);
}

pub(crate) fn set_max_send_buffer_size(&mut self, max: usize) {
self.inner.streams.set_max_send_buffer_size(max);
}

/// Send a new SETTINGS frame with an updated initial window size.
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
let mut settings = frame::Settings::default();
Expand Down
42 changes: 39 additions & 3 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub(super) struct Prioritize {

/// What `DATA` frame is currently being sent in the codec.
in_flight_data_frame: InFlightData,

/// The max send buffer size allowed.
max_send_buffer_size: usize,

/// The current send buffer size.
current_send_buffer_size: usize,
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -93,9 +99,17 @@ impl Prioritize {
flow,
last_opened_id: StreamId::ZERO,
in_flight_data_frame: InFlightData::Nothing,
max_send_buffer_size: usize::MAX,
current_send_buffer_size: 0,
}
}

pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
self.max_send_buffer_size = max;

self.assign_connection_capacity(0, store, counts);
}

/// Queue a frame to be sent to the remote
pub fn queue_frame<B>(
&mut self,
Expand Down Expand Up @@ -175,6 +189,8 @@ impl Prioritize {
self.try_assign_capacity(stream);
}

self.current_send_buffer_size += sz as usize;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to do this prior to calling try_assign_capacity. AFAIU that was wrong, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right. The connection-level and stream-level should be updated together, but since updating the stream-level may make it exceed it's requested capacity and thus needs to check for more capacity, that must happen in-between.

A code comment about the ordering being important may be prudent.


if frame.is_end_stream() {
stream.state.send_close();
self.reserve_capacity(0, stream, counts);
Expand Down Expand Up @@ -350,7 +366,7 @@ impl Prioritize {
self.flow.assign_capacity(inc);

// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
while self.available() > 0 {
let stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return,
Expand All @@ -373,6 +389,17 @@ impl Prioritize {
}
}

fn available(&self) -> WindowSize {
cmp::min(
self.flow.available().as_size() as usize,
cmp::min(
self.max_send_buffer_size
.saturating_sub(self.current_send_buffer_size),
WindowSize::MAX as usize,
),
) as WindowSize
}

/// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
let total_requested = stream.requested_send_capacity;
Expand All @@ -395,7 +422,8 @@ impl Prioritize {
additional,
buffered = stream.buffered_send_data,
window = stream.send_flow.window_size(),
conn = %self.flow.available()
conn_window = %self.flow.available(),
conn = self.available(),
);

if additional == 0 {
Expand All @@ -413,7 +441,7 @@ impl Prioritize {
);

// The amount of currently available capacity on the connection
let conn_available = self.flow.available().as_size();
let conn_available = self.available();

// First check if capacity is immediately available
if conn_available > 0 {
Expand Down Expand Up @@ -509,6 +537,10 @@ impl Prioritize {

// Because, always try to reclaim...
self.reclaim_frame(buffer, store, dst);

// Maybe schedule streams if the send buffer is not full
// anymore.
self.assign_connection_capacity(0, store, counts);
}
None => {
// Try to flush the codec.
Expand Down Expand Up @@ -630,6 +662,8 @@ impl Prioritize {
tracing::trace!(?frame, "dropping");
}

self.current_send_buffer_size -= stream.buffered_send_data;

stream.buffered_send_data = 0;
stream.requested_send_capacity = 0;
if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
Expand Down Expand Up @@ -736,6 +770,8 @@ impl Prioritize {
tracing::trace_span!("updating stream flow").in_scope(|| {
stream.send_flow.send_data(len);

self.current_send_buffer_size -= len as usize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this area needs to potentially wake up any tasks that have been waiting on "capacity", since now there is "more buffer space to use". The reason it needs to be done here, is because with flow control capacity, the connection doesn't actually get more until the peer has sent another WINDOW_UPDATE frame. So recv_window_update will notify a task there. But buffer size isn't affected by WINDOW_UPDATE frames.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. Why do we even call self.flow.assign_capacity(len); below that line, if we are actually supposed to wait for the peer to send another WINDOW_UPDATE frame?

Copy link
Contributor Author

@nox nox Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am even more confused that we call self.flow.send_data(len); immediately afterwards we called self.flow.assign_capacity(len);.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that mean that self.flow.available never changes from Prioritize::pop_frame? I don't understand the logic there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called assign_connection_capacity from poll_complete after the reclaim_frame call.


// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len as usize);
stream.buffered_send_data -= len as usize;
Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Send {
Ok(())
}

pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
self.prioritize.set_max_send_buffer_size(max, store, counts);
}

pub fn send_headers<B>(
&mut self,
frame: frame::Headers,
Expand Down
9 changes: 9 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ where
.set_target_connection_window(size, &mut me.actions.task)
}

pub fn set_max_send_buffer_size(&mut self, max: usize) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

me.actions
.send
.set_max_send_buffer_size(max, &mut me.store, &mut me.counts);
}

pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
Expand Down
41 changes: 41 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ pub struct Builder {

/// Initial target window size for new connections.
initial_target_connection_window_size: Option<u32>,

/// Max send buffer size.
max_send_buffer_size: Option<usize>,
}

/// Send a response back to the client
Expand Down Expand Up @@ -451,6 +454,11 @@ where
self.connection.set_target_window_size(size);
}

/// Sets the max send buffer size for the whole connection.
pub fn set_max_send_buffer_size(&mut self, max: usize) {
self.connection.set_max_send_buffer_size(max);
}

/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
Expand Down Expand Up @@ -620,6 +628,7 @@ impl Builder {
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: None,
}
}

Expand Down Expand Up @@ -763,6 +772,35 @@ impl Builder {
self
}

/// Sets the max size of the send buffer.
///
/// This setting is also used to limit the maximum amount of data
/// buffered to be sent.
///
/// # Examples
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
/// # use h2::server::*;
/// #
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<T>
/// # {
/// // `server_fut` is a future representing the completion of the HTTP/2
/// // handshake.
/// let server_fut = Builder::new()
/// .max_send_buffer_size(16 * 1024)
/// .handshake(my_io);
/// # server_fut
/// # }
/// #
/// # pub fn main() {}
/// ```
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
self.max_send_buffer_size = Some(max);
self
}

/// Sets the maximum number of concurrent streams.
///
/// The maximum concurrent streams setting only controls the maximum number
Expand Down Expand Up @@ -1280,6 +1318,9 @@ where
if let Some(sz) = self.builder.initial_target_connection_window_size {
c.set_target_window_size(sz);
}
if let Some(sz) = self.builder.max_send_buffer_size {
c.set_max_send_buffer_size(sz);
}
Ok(c)
})
}
Expand Down
83 changes: 83 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1561,3 +1561,86 @@ async fn data_padding() {

join(srv, h2).await;
}


#[tokio::test]
async fn notify_on_send_buffer_available() {
// This test ensures that the stream gets notified when there is additional
// send buffer space.
h2_support::trace_init!();

let (io, mut client) = mock::new();


let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client.send_frame(
frames::headers(1)
.request("GET", "https://www.example.com/")
.eos()
)
.await;
client.send_frame(
frames::headers(3)
.request("GET", "https://www.example.com/")
.eos()
)
.await;
client.recv_frame(frames::headers(1).response(200)).await;
client.recv_frame(frames::headers(3).response(200)).await;
dbg!(11);
client.recv_frame(frames::data(1, &b"abcde"[..]).eos()).await;
dbg!(31);
client.recv_frame(frames::data(3, &b"abcde"[..])).await;
dbg!(32);
client.recv_frame(frames::data(3, &b"abcde"[..])).await;
dbg!(33);
client.recv_frame(frames::data(3, &b"abcde"[..])).await;
dbg!(34);
client.recv_frame(frames::data(3, &b""[..]).eos()).await;
};

let srv = async move {
let mut srv = server::Builder::new()
.max_send_buffer_size(5)
.handshake::<_, Bytes>(io)
.await
.expect("handshake");

let (_req, mut reply1) = srv.next().await.unwrap().unwrap();
let (_req, mut reply2) = srv.next().await.unwrap().unwrap();

let mut stream1 = reply1.send_response(http::Response::new(()), false).unwrap();
let mut stream2 = reply2.send_response(http::Response::new(()), false).unwrap();
drop((reply1, reply2));

let t0 = tokio::spawn(async move {
assert!(srv.next().await.is_none(), "unexpected request");
});
let t1 = tokio::spawn(async move {
eprintln!("[t1] RESERVE 1 cap");
stream1.reserve_capacity(1);
stream1 = util::wait_for_capacity(stream1, 1).await;
eprintln!("[t1] got 1 cap");
stream1.send_data("abcde".into(), true).unwrap();
});
let t2 = tokio::spawn(async move {
for n in 0..3 {
eprintln!("[t2] RESERVE 1 cap, loop {}", n);
stream2.reserve_capacity(1);
stream2 = util::wait_for_capacity(stream2, 1).await;
eprintln!("[t2] got 1 cap, loop {}", n);
stream2.send_data("abcde".into(), false).unwrap();
}

stream2.send_data("".into(), true).unwrap();
});

t2.await.expect("srv body spawn");
t1.await.expect("srv body spawn");
t0.await.expect("srv end");
Comment on lines +1640 to +1642
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the test is still wrong. We await for t2 before we await for t0, so t0 is pending and nothing will wake it up when t2 notifies the connection task that it has frames to send, so the send buffer size never decreases and everything gets stuck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind I misread. That being said, the test really doesn't help much finding where the issue with my patch is :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _ = ready!(self.stream().poll_capacity(cx)).unwrap();
let act = self.stream().capacity();
if act >= self.target {
return Poll::Ready(self.stream.take().unwrap().into());
}
Poll::Pending
}

There, poll_capacity returns Ready(Some(0)), and capacity returns 0, so act >= self.target is false, and Poll::Pending gets returned even though nothing has been set up to wake up the task.

What do you think is wrong here? That poll_capacity returns Ready(Some(0)), or that wait_for_capacity is badly conceived, or both?

};

join(srv, client).await;
}