Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Every cluster has a statistics tree rooted at *cluster.<name>.* with the followi
upstream_rq_retry_overflow, Counter, Total requests not retried due to circuit breaking
upstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from upstream.
upstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from upstream.
upstream_flow_control_backed_up_total, Counter, Total number of times the upstream connection backed up and paused reads from downstream.
upstream_flow_control_drained_total, Counter, Total number of times the upstream connection drained and resumed reads from downstream.
membership_change, Counter, Total cluster membership changes
membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection)
membership_total, Gauge, Current cluster membership total
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/http_conn_man/http_conn_man.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ http2_settings
NOTE: 65535 is the initial window size from HTTP/2 spec. We only support increasing the default window
size now, so it's also the minimum.

This field also acts as a soft limit on the number of bytes Envoy will buffer per-stream in the
HTTP/2 codec buffers. Once the buffer reaches this pointer, watermark callbacks will fire to
stop the flow of data to the codec buffers.

initial_connection_window_size
*(optional, integer)* Similar to :ref:`initial_stream_window_size
<config_http_conn_man_http2_settings_initial_stream_window_size>`, but for connection-level flow-control
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/http_conn_man/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ statistics:
downstream_cx_tx_bytes_buffered, Gauge, Total sent bytes currently buffered
downstream_cx_drain_close, Counter, Total connections closed due to draining
downstream_cx_idle_timeout, Counter, Total connections closed due to idle timeout
downstream_flow_control_paused_reading_total, Counter, Total number of times reads were disabled due to flow control
downstream_flow_control_resumed_reading_total, Counter, Total number of times reads were enabled on the connection due to flow control
downstream_rq_total, Counter, Total requests
downstream_rq_http1_total, Counter, Total HTTP/1.1 requests
downstream_rq_http2_total, Counter, Total HTTP/2 requests
Expand Down
30 changes: 30 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ class StreamCallbacks {
* @param reason supplies the reset reason.
*/
virtual void onResetStream(StreamResetReason reason) PURE;

/**
* Fires when a stream, or the connection the stream is sending to, goes over its high watermark.
*/
virtual void onAboveWriteBufferHighWatermark() PURE;

/**
* Fires when a stream, or the connection the stream is sending to, goes from over its high
* watermark to under its low watermark.
*/
virtual void onBelowWriteBufferLowWatermark() PURE;
};

/**
Expand All @@ -132,6 +143,14 @@ class Stream {
* @param reason supplies the reset reason.
*/
virtual void resetStream(StreamResetReason reason) PURE;

/**
* Enable/disable further data from this stream.
* Cessation of data may not be immediate. For example, for HTTP/2 this may stop further flow
* control window updates which will result in the peer eventually stopping sending data.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a later implementation, have you thought about using distance from watermark as a way to size the H2 stream flow control window?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would avoid the double buffer and given the proximity between buffer and data source I think it'd be a nice clean thing to do, but it would make this one buffer + data source combination special and unusual so I'm not convinced it'd be worth doing unless I get to do the full flow control redesign we've talked about in person where everything seemlessly avoids the double buffering.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything seemlessly avoids the double buffering

I assume you mean by rejiggering everything so that encode* returns the number of bytes accepted, etc.? That's certainly doable, though, buffering still will need to happen since whoever fails to write will need to buffer locally until they can write. This pushes more complexity out to the leaves. Not sure it's worth it, but I guess we can see how things go with this approach.

* @param disable informs if reads should be disabled (true) or re-enabled (false).
*/
virtual void readDisable(bool disable) PURE;
};

/**
Expand Down Expand Up @@ -260,6 +279,17 @@ class ClientConnection : public virtual Connection {
* @return StreamEncoder& supplies the encoder to write the request into.
*/
virtual StreamEncoder& newStream(StreamDecoder& response_decoder) PURE;

/**
* Called when the underlying Network::Connection goes over its high watermark.
*/
virtual void onUnderlyingConnectionAboveWriteBufferHighWatermark() PURE;

/**
* Called when the underlying Network::Connection goes from over its high watermark to under its
* low watermark.
*/
virtual void onUnderlyingConnectionBelowWriteBufferLowWatermark() PURE;
};

typedef std::unique_ptr<ClientConnection> ClientConnectionPtr;
Expand Down
26 changes: 26 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
* @param trailers supplies the trailers to encode.
*/
virtual void encodeTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when the buffer for a decoder filter or any buffers the filter sends data to go over
* their high watermark.
*
* In the case of a filter such as the router filter, which spills into multiple buffers (codec,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share this comment with the other variants below? I'm thinking we might want to take the design doc and digest it down to a Markdown file explaining what is actually implemented, and example sequences of going above and below to convey the intuition of when callbacks are fired etc. Maybe you can figure out a better way to convey the intuition in comment form, up to you.

Copy link
Copy Markdown
Member

@htuch htuch Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having read more of the PR, I think a MD file explaining the connection + stream level design stuff, and then real examples of callback sequences on different events/overflows would be super helpful. Looking through the PR locally, it looks very solid, but I'm having a hard time connecting all the dots staring at the code, some worked examples or diagrams if you prefer would be great.

* connection etc.) this may be called multiple times. Any such filter is responsible for calling
* the low watermark callbacks an equal number of times as the respective buffers are drained.
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a decoder filter or any buffers the filter sends data to go from over its high
* watermark to under its low watermark.
*/
virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;
};

/**
Expand Down Expand Up @@ -297,6 +313,16 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
* It is an error to call this method in any other case.
*/
virtual void addEncodedData(Buffer::Instance& data) PURE;

/**
* Called when an encoder filter goes over its high watermark.
*/
virtual void onEncoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a encoder filter goes from over its high watermark to under its low watermark.
*/
virtual void onEncoderFilterBelowWriteBufferLowWatermark() PURE;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class HostSet {
COUNTER(upstream_rq_retry_overflow) \
COUNTER(upstream_flow_control_paused_reading_total) \
COUNTER(upstream_flow_control_resumed_reading_total) \
COUNTER(upstream_flow_control_backed_up_total) \
COUNTER(upstream_flow_control_drained_total) \
GAUGE (max_host_weight) \
COUNTER(membership_change) \
GAUGE (membership_healthy) \
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(HeaderMapPtr&& trailers) override;
void onDecoderFilterAboveWriteBufferHighWatermark() override {}
void onDecoderFilterBelowWriteBufferLowWatermark() override {}

AsyncClient::StreamCallbacks& stream_callbacks_;
const uint64_t stream_id_;
Expand Down
12 changes: 10 additions & 2 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,

// StreamCallbacks
void onResetStream(StreamResetReason reason) override { parent_.onReset(*this, reason); }
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

// StreamDecoderWrapper
void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); }
Expand All @@ -180,8 +182,14 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
// Pass watermark events from the connection on to the codec which will pass it to the underlying
// streams.
void onAboveWriteBufferHighWatermark() override {
codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
}
void onBelowWriteBufferLowWatermark() override {
codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
}

std::list<ActiveRequestPtr> active_requests_;
Http::ConnectionCallbacks* codec_callbacks_{};
Expand Down
13 changes: 13 additions & 0 deletions source/common/http/codec_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ namespace Http {

class StreamCallbackHelper {
public:
void runLowWatermarkCallbacks() {
// TODO(alyssawilk) see if we can make this safe for disconnects mid-loop
for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onBelowWriteBufferLowWatermark();
}
}

void runHighWatermarkCallbacks() {
for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onAboveWriteBufferHighWatermark();
}
}

void runResetCallbacks(StreamResetReason reason) {
if (reset_callbacks_run_) {
return;
Expand Down
14 changes: 14 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,20 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::encodeTrailers(HeaderMapP
parent_.encodeTrailers(nullptr, *parent_.response_trailers_);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::
onDecoderFilterAboveWriteBufferHighWatermark() {
ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", parent_);
parent_.response_encoder_->getStream().readDisable(true);
parent_.connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 Do you think it's also worth adding the source-cluster of the flow control backup or is that overkill?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have this in upstream stats, so I think this is overkill (see my comment about router stats). Router should be hitting the cluster stats in this case IMO.

}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::
onDecoderFilterBelowWriteBufferLowWatermark() {
ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", parent_);
parent_.response_encoder_->getStream().readDisable(false);
parent_.connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc();
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data) {
return parent_.addEncodedData(*this, data);
}
Expand Down
11 changes: 11 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ namespace Http {
GAUGE (downstream_cx_tx_bytes_buffered) \
COUNTER(downstream_cx_drain_close) \
COUNTER(downstream_cx_idle_timeout) \
COUNTER(downstream_flow_control_paused_reading_total) \
COUNTER(downstream_flow_control_resumed_reading_total) \
COUNTER(downstream_rq_total) \
COUNTER(downstream_rq_http1_total) \
COUNTER(downstream_rq_http2_total) \
Expand Down Expand Up @@ -279,6 +281,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override;
// TODO(alyssawilk) disable upstream reads.
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

Expand Down Expand Up @@ -354,6 +357,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(HeaderMapPtr&& trailers) override;
void onDecoderFilterAboveWriteBufferHighWatermark() override;
void onDecoderFilterBelowWriteBufferLowWatermark() override;

StreamDecoderFilterSharedPtr handle_;
};
Expand Down Expand Up @@ -384,6 +389,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data) override;
// TODO(alysawilk) disable reads from upstream.
void onEncoderFilterAboveWriteBufferHighWatermark() override {}
void onEncoderFilterBelowWriteBufferLowWatermark() override {}
void continueEncoding() override;
const Buffer::Instance* encodingBuffer() override {
return parent_.buffered_response_data_.get();
Expand Down Expand Up @@ -428,6 +436,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamCallbacks
void onResetStream(StreamResetReason reason) override;
// TODO(alyssawilk) disable upstream reads.
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

// Http::StreamDecoder
void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class StreamEncoderImpl : public StreamEncoder,
void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); }
void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); }
void resetStream(StreamResetReason reason) override;
void readDisable(bool) override {}

protected:
StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {}
Expand Down Expand Up @@ -282,6 +283,8 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl {

// Http::ClientConnection
StreamEncoder& newStream(StreamDecoder& response_decoder) override;
void onUnderlyingConnectionAboveWriteBufferHighWatermark() override {}
void onUnderlyingConnectionBelowWriteBufferLowWatermark() override {}

private:
struct PendingResponse {
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

// Http::StreamCallbacks
void onResetStream(StreamResetReason) override { parent_.parent_.onDownstreamReset(parent_); }
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

ActiveClient& parent_;
bool encode_complete_{};
Expand Down
1 change: 1 addition & 0 deletions source/common/http/http2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_cc_library(
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//source/common/buffer:buffer_lib",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:enum_to_int",
"//source/common/common:linked_object",
Expand Down
Loading