diff --git a/docs/configuration/cluster_manager/cluster_stats.rst b/docs/configuration/cluster_manager/cluster_stats.rst index 3b10bba64433a..3ab7510fa40cb 100644 --- a/docs/configuration/cluster_manager/cluster_stats.rst +++ b/docs/configuration/cluster_manager/cluster_stats.rst @@ -55,6 +55,8 @@ Every cluster has a statistics tree rooted at *cluster..* with the followi upstream_rq_retry_overflow, Counter, Total requests not retried due to circuit breaking upstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from upstream. upstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from upstream. + upstream_flow_control_backed_up_total, Counter, Total number of times the upstream connection backed up and paused reads from downstream. + upstream_flow_control_drained_total, Counter, Total number of times the upstream connection drained and resumed reads from downstream. membership_change, Counter, Total cluster membership changes membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection) membership_total, Gauge, Current cluster membership total diff --git a/docs/configuration/http_conn_man/http_conn_man.rst b/docs/configuration/http_conn_man/http_conn_man.rst index 82fca2669610b..c5dc0d0ce39a6 100644 --- a/docs/configuration/http_conn_man/http_conn_man.rst +++ b/docs/configuration/http_conn_man/http_conn_man.rst @@ -135,6 +135,10 @@ http2_settings NOTE: 65535 is the initial window size from HTTP/2 spec. We only support increasing the default window size now, so it's also the minimum. + This field also acts as a soft limit on the number of bytes Envoy will buffer per-stream in the + HTTP/2 codec buffers. Once the buffer reaches this pointer, watermark callbacks will fire to + stop the flow of data to the codec buffers. + initial_connection_window_size *(optional, integer)* Similar to :ref:`initial_stream_window_size `, but for connection-level flow-control diff --git a/docs/configuration/http_conn_man/stats.rst b/docs/configuration/http_conn_man/stats.rst index 2395d2ce349d2..648be6bca6526 100644 --- a/docs/configuration/http_conn_man/stats.rst +++ b/docs/configuration/http_conn_man/stats.rst @@ -32,6 +32,8 @@ statistics: downstream_cx_tx_bytes_buffered, Gauge, Total sent bytes currently buffered downstream_cx_drain_close, Counter, Total connections closed due to draining downstream_cx_idle_timeout, Counter, Total connections closed due to idle timeout + downstream_flow_control_paused_reading_total, Counter, Total number of times reads were disabled due to flow control + downstream_flow_control_resumed_reading_total, Counter, Total number of times reads were enabled on the connection due to flow control downstream_rq_total, Counter, Total requests downstream_rq_http1_total, Counter, Total HTTP/1.1 requests downstream_rq_http2_total, Counter, Total HTTP/2 requests diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 4c0b53726ac61..e50cea348de4f 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -106,6 +106,17 @@ class StreamCallbacks { * @param reason supplies the reset reason. */ virtual void onResetStream(StreamResetReason reason) PURE; + + /** + * Fires when a stream, or the connection the stream is sending to, goes over its high watermark. + */ + virtual void onAboveWriteBufferHighWatermark() PURE; + + /** + * Fires when a stream, or the connection the stream is sending to, goes from over its high + * watermark to under its low watermark. + */ + virtual void onBelowWriteBufferLowWatermark() PURE; }; /** @@ -132,6 +143,14 @@ class Stream { * @param reason supplies the reset reason. */ virtual void resetStream(StreamResetReason reason) PURE; + + /** + * Enable/disable further data from this stream. + * Cessation of data may not be immediate. For example, for HTTP/2 this may stop further flow + * control window updates which will result in the peer eventually stopping sending data. + * @param disable informs if reads should be disabled (true) or re-enabled (false). + */ + virtual void readDisable(bool disable) PURE; }; /** @@ -260,6 +279,17 @@ class ClientConnection : public virtual Connection { * @return StreamEncoder& supplies the encoder to write the request into. */ virtual StreamEncoder& newStream(StreamDecoder& response_decoder) PURE; + + /** + * Called when the underlying Network::Connection goes over its high watermark. + */ + virtual void onUnderlyingConnectionAboveWriteBufferHighWatermark() PURE; + + /** + * Called when the underlying Network::Connection goes from over its high watermark to under its + * low watermark. + */ + virtual void onUnderlyingConnectionBelowWriteBufferLowWatermark() PURE; }; typedef std::unique_ptr ClientConnectionPtr; diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index b4f9722a1677a..5b1aae7e0d450 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -198,6 +198,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { * @param trailers supplies the trailers to encode. */ virtual void encodeTrailers(HeaderMapPtr&& trailers) PURE; + + /** + * Called when the buffer for a decoder filter or any buffers the filter sends data to go over + * their high watermark. + * + * In the case of a filter such as the router filter, which spills into multiple buffers (codec, + * connection etc.) this may be called multiple times. Any such filter is responsible for calling + * the low watermark callbacks an equal number of times as the respective buffers are drained. + */ + virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE; + + /** + * Called when a decoder filter or any buffers the filter sends data to go from over its high + * watermark to under its low watermark. + */ + virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE; }; /** @@ -297,6 +313,16 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { * It is an error to call this method in any other case. */ virtual void addEncodedData(Buffer::Instance& data) PURE; + + /** + * Called when an encoder filter goes over its high watermark. + */ + virtual void onEncoderFilterAboveWriteBufferHighWatermark() PURE; + + /** + * Called when a encoder filter goes from over its high watermark to under its low watermark. + */ + virtual void onEncoderFilterBelowWriteBufferLowWatermark() PURE; }; /** diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index b5699dba9b8a2..f9f0aa9c2e135 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -205,6 +205,8 @@ class HostSet { COUNTER(upstream_rq_retry_overflow) \ COUNTER(upstream_flow_control_paused_reading_total) \ COUNTER(upstream_flow_control_resumed_reading_total) \ + COUNTER(upstream_flow_control_backed_up_total) \ + COUNTER(upstream_flow_control_drained_total) \ GAUGE (max_host_weight) \ COUNTER(membership_change) \ GAUGE (membership_healthy) \ diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 107f70ac60830..27048b8f9b05e 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -197,6 +197,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; void encodeData(Buffer::Instance& data, bool end_stream) override; void encodeTrailers(HeaderMapPtr&& trailers) override; + void onDecoderFilterAboveWriteBufferHighWatermark() override {} + void onDecoderFilterBelowWriteBufferLowWatermark() override {} AsyncClient::StreamCallbacks& stream_callbacks_; const uint64_t stream_id_; diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 68436351003e9..503818e4dcad2 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -157,6 +157,8 @@ class CodecClient : Logger::Loggable, // StreamCallbacks void onResetStream(StreamResetReason reason) override { parent_.onReset(*this, reason); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // StreamDecoderWrapper void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); } @@ -180,8 +182,14 @@ class CodecClient : Logger::Loggable, // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override; - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + // Pass watermark events from the connection on to the codec which will pass it to the underlying + // streams. + void onAboveWriteBufferHighWatermark() override { + codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark(); + } + void onBelowWriteBufferLowWatermark() override { + codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark(); + } std::list active_requests_; Http::ConnectionCallbacks* codec_callbacks_{}; diff --git a/source/common/http/codec_helper.h b/source/common/http/codec_helper.h index f73fc95ccfd9c..fff96ad2105f7 100644 --- a/source/common/http/codec_helper.h +++ b/source/common/http/codec_helper.h @@ -7,6 +7,19 @@ namespace Http { class StreamCallbackHelper { public: + void runLowWatermarkCallbacks() { + // TODO(alyssawilk) see if we can make this safe for disconnects mid-loop + for (StreamCallbacks* callbacks : callbacks_) { + callbacks->onBelowWriteBufferLowWatermark(); + } + } + + void runHighWatermarkCallbacks() { + for (StreamCallbacks* callbacks : callbacks_) { + callbacks->onAboveWriteBufferHighWatermark(); + } + } + void runResetCallbacks(StreamResetReason reason) { if (reset_callbacks_run_) { return; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 610e1b09a3fee..06b5f8c05505a 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -988,6 +988,20 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::encodeTrailers(HeaderMapP parent_.encodeTrailers(nullptr, *parent_.response_trailers_); } +void ConnectionManagerImpl::ActiveStreamDecoderFilter:: + onDecoderFilterAboveWriteBufferHighWatermark() { + ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", parent_); + parent_.response_encoder_->getStream().readDisable(true); + parent_.connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc(); +} + +void ConnectionManagerImpl::ActiveStreamDecoderFilter:: + onDecoderFilterBelowWriteBufferLowWatermark() { + ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", parent_); + parent_.response_encoder_->getStream().readDisable(false); + parent_.connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc(); +} + void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data) { return parent_.addEncodedData(*this, data); } diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 64f70cdc6c44a..8e5656d6ca3bc 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -58,6 +58,8 @@ namespace Http { GAUGE (downstream_cx_tx_bytes_buffered) \ COUNTER(downstream_cx_drain_close) \ COUNTER(downstream_cx_idle_timeout) \ + COUNTER(downstream_flow_control_paused_reading_total) \ + COUNTER(downstream_flow_control_resumed_reading_total) \ COUNTER(downstream_rq_total) \ COUNTER(downstream_rq_http1_total) \ COUNTER(downstream_rq_http2_total) \ @@ -279,6 +281,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override; + // TODO(alyssawilk) disable upstream reads. void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} @@ -354,6 +357,8 @@ class ConnectionManagerImpl : Logger::Loggable, void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; void encodeData(Buffer::Instance& data, bool end_stream) override; void encodeTrailers(HeaderMapPtr&& trailers) override; + void onDecoderFilterAboveWriteBufferHighWatermark() override; + void onDecoderFilterBelowWriteBufferLowWatermark() override; StreamDecoderFilterSharedPtr handle_; }; @@ -384,6 +389,9 @@ class ConnectionManagerImpl : Logger::Loggable, // Http::StreamEncoderFilterCallbacks void addEncodedData(Buffer::Instance& data) override; + // TODO(alysawilk) disable reads from upstream. + void onEncoderFilterAboveWriteBufferHighWatermark() override {} + void onEncoderFilterBelowWriteBufferLowWatermark() override {} void continueEncoding() override; const Buffer::Instance* encodingBuffer() override { return parent_.buffered_response_data_.get(); @@ -428,6 +436,9 @@ class ConnectionManagerImpl : Logger::Loggable, // Http::StreamCallbacks void onResetStream(StreamResetReason reason) override; + // TODO(alyssawilk) disable upstream reads. + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} // Http::StreamDecoder void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 56acfaa6b3c9f..5af46c243e2c6 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -42,6 +42,7 @@ class StreamEncoderImpl : public StreamEncoder, void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); } void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); } void resetStream(StreamResetReason reason) override; + void readDisable(bool) override {} protected: StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {} @@ -282,6 +283,8 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { // Http::ClientConnection StreamEncoder& newStream(StreamDecoder& response_decoder) override; + void onUnderlyingConnectionAboveWriteBufferHighWatermark() override {} + void onUnderlyingConnectionBelowWriteBufferLowWatermark() override {} private: struct PendingResponse { diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 32cee2a1b144c..a94aa36afb27c 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -57,6 +57,8 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: // Http::StreamCallbacks void onResetStream(StreamResetReason) override { parent_.parent_.onDownstreamReset(parent_); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} ActiveClient& parent_; bool encode_complete_{}; diff --git a/source/common/http/http2/BUILD b/source/common/http/http2/BUILD index 247c8c06ecdb0..78e2631837be1 100644 --- a/source/common/http/http2/BUILD +++ b/source/common/http/http2/BUILD @@ -24,6 +24,7 @@ envoy_cc_library( "//include/envoy/stats:stats_interface", "//include/envoy/stats:stats_macros", "//source/common/buffer:buffer_lib", + "//source/common/buffer:watermark_buffer_lib", "//source/common/common:assert_lib", "//source/common/common:enum_to_int", "//source/common/common:linked_object", diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 00db840af1a64..ad0e4faf452d8 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -51,10 +51,18 @@ template static T* remove_const(const void* object) { return const_cast(reinterpret_cast(object)); } -ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent) +ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) : parent_(parent), headers_(new HeaderMapImpl()), local_end_stream_(false), local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false), - waiting_for_non_informational_headers_(false) {} + waiting_for_non_informational_headers_(false), + pending_receive_buffer_high_watermark_called_(false), + pending_send_buffer_high_watermark_called_(false) { + if (buffer_limit > 0) { + setWriteBufferWatermarks(buffer_limit / 2, buffer_limit); + } +} + +ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(unconsumed_bytes_ == 0); } static void insertHeader(std::vector& headers, const HeaderEntry& header) { uint8_t flags = 0; @@ -125,6 +133,48 @@ void ConnectionImpl::StreamImpl::encodeTrailers(const HeaderMap& trailers) { parent_.sendPendingFrames(); } } +void ConnectionImpl::StreamImpl::readDisable(bool disable) { + ENVOY_CONN_LOG(debug, "Stream {} disabled: disable {}, unconsumed_bytes {}", parent_.connection_, + stream_id_, disable, unconsumed_bytes_); + if (disable) { + ++read_disable_count_; + } else { + --read_disable_count_; + if (!buffers_overrun()) { + nghttp2_session_consume(parent_.session_, stream_id_, unconsumed_bytes_); + unconsumed_bytes_ = 0; + parent_.sendPendingFrames(); + } + } +} + +void ConnectionImpl::StreamImpl::pendingRecvBufferHighWatermark() { + ENVOY_CONN_LOG(debug, "recv buffer over limit ", parent_.connection_); + ASSERT(!pending_receive_buffer_high_watermark_called_); + pending_receive_buffer_high_watermark_called_ = true; + readDisable(true); +} + +void ConnectionImpl::StreamImpl::pendingRecvBufferLowWatermark() { + ENVOY_CONN_LOG(debug, "recv buffer under limit ", parent_.connection_); + ASSERT(pending_receive_buffer_high_watermark_called_); + pending_receive_buffer_high_watermark_called_ = false; + readDisable(false); +} + +void ConnectionImpl::StreamImpl::pendingSendBufferHighWatermark() { + ENVOY_CONN_LOG(debug, "send buffer over limit ", parent_.connection_); + ASSERT(!pending_send_buffer_high_watermark_called_); + pending_send_buffer_high_watermark_called_ = true; + runHighWatermarkCallbacks(); +} + +void ConnectionImpl::StreamImpl::pendingSendBufferLowWatermark() { + ENVOY_CONN_LOG(debug, "send buffer under limit ", parent_.connection_); + ASSERT(pending_send_buffer_high_watermark_called_); + pending_send_buffer_high_watermark_called_ = false; + runLowWatermarkCallbacks(); +} void ConnectionImpl::StreamImpl::saveHeader(HeaderString&& name, HeaderString&& value) { if (!Utility::reconstituteCrumbledCookies(name, value, cookies_)) { @@ -267,7 +317,17 @@ ConnectionImpl::StreamImpl* ConnectionImpl::getStream(int32_t stream_id) { } int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) { - getStream(stream_id)->pending_recv_data_.add(data, len); + StreamImpl* stream = getStream(stream_id); + // If this results in buffering too much data, the watermark buffer will call + // pendingRecvBufferHighWatermark, resulting in ++read_disable_count_ + stream->pending_recv_data_.add(data, len); + // Update the window to the peer unless some consumer of this stream's data has hit a flow control + // limit and disabled reads on this stream + if (!stream->buffers_overrun()) { + nghttp2_session_consume(session_, stream_id, len); + } else { + stream->unconsumed_bytes_ += len; + } return 0; } @@ -463,6 +523,11 @@ int ConnectionImpl::onStreamClose(int32_t stream_id, uint32_t error_code) { } connection_.dispatcher().deferredDelete(stream->removeFromList(active_streams_)); + // Any unconsumed data must be consumed before the stream is deleted. + // nghttp2 does not appear to track this internally, and any stream deleted + // with outstanding window will contribute to a slow connection-window leak. + nghttp2_session_consume(session_, stream_id, stream->unconsumed_bytes_); + stream->unconsumed_bytes_ = 0; nghttp2_session_set_stream_user_data(session_, stream->stream_id_, nullptr); } @@ -665,21 +730,22 @@ ConnectionImpl::Http2Options::Http2Options() { // calculations. This saves a tremendous amount of memory in cases where there are a large number // of kept alive HTTP/2 connections. nghttp2_option_set_no_closed_streams(options_, 1); + nghttp2_option_set_no_auto_window_update(options_, 1); } ConnectionImpl::Http2Options::~Http2Options() { nghttp2_option_del(options_); } ClientConnectionImpl::ClientConnectionImpl(Network::Connection& connection, - ConnectionCallbacks& callbacks, Stats::Scope& stats, - const Http2Settings& http2_settings) - : ConnectionImpl(connection, stats), callbacks_(callbacks) { + Http::ConnectionCallbacks& callbacks, + Stats::Scope& stats, const Http2Settings& http2_settings) + : ConnectionImpl(connection, stats, http2_settings), callbacks_(callbacks) { nghttp2_session_client_new2(&session_, http2_callbacks_.callbacks(), base(), http2_options_.options()); sendSettings(http2_settings); } Http::StreamEncoder& ClientConnectionImpl::newStream(StreamDecoder& decoder) { - StreamImplPtr stream(new ClientStreamImpl(*this)); + StreamImplPtr stream(new ClientStreamImpl(*this, per_stream_buffer_limit_)); stream->decoder_ = &decoder; stream->moveIntoList(std::move(stream), active_streams_); return *active_streams_.front(); @@ -710,7 +776,7 @@ int ClientConnectionImpl::onHeader(const nghttp2_frame* frame, HeaderString&& na ServerConnectionImpl::ServerConnectionImpl(Network::Connection& connection, Http::ServerConnectionCallbacks& callbacks, Stats::Scope& scope, const Http2Settings& http2_settings) - : ConnectionImpl(connection, scope), callbacks_(callbacks) { + : ConnectionImpl(connection, scope, http2_settings), callbacks_(callbacks) { nghttp2_session_server_new2(&session_, http2_callbacks_.callbacks(), base(), http2_options_.options()); sendSettings(http2_settings); @@ -729,7 +795,7 @@ int ServerConnectionImpl::onBeginHeaders(const nghttp2_frame* frame) { return 0; } - StreamImplPtr stream(new ServerStreamImpl(*this)); + StreamImplPtr stream(new ServerStreamImpl(*this, per_stream_buffer_limit_)); stream->decoder_ = &callbacks_.newStream(*stream); stream->stream_id_ = frame->hd.stream_id; stream->moveIntoList(std::move(stream), active_streams_); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 674887f74130a..04fc317ddfcc6 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -14,6 +14,7 @@ #include "envoy/stats/stats_macros.h" #include "common/buffer/buffer_impl.h" +#include "common/buffer/watermark_buffer.h" #include "common/common/linked_object.h" #include "common/common/logger.h" #include "common/http/codec_helper.h" @@ -68,10 +69,12 @@ class Utility { */ class ConnectionImpl : public virtual Connection, Logger::Loggable { public: - ConnectionImpl(Network::Connection& connection, Stats::Scope& stats) + ConnectionImpl(Network::Connection& connection, Stats::Scope& stats, + const Http2Settings& http2_settings) : stats_{ALL_HTTP2_CODEC_STATS(POOL_COUNTER_PREFIX(stats, "http2."))}, - connection_(connection), dispatching_(false), raised_goaway_(false), - pending_deferred_reset_(false) {} + connection_(connection), + per_stream_buffer_limit_(http2_settings.initial_stream_window_size_), dispatching_(false), + raised_goaway_(false), pending_deferred_reset_(false) {} ~ConnectionImpl(); @@ -120,7 +123,9 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable 0; } + ConnectionImpl& parent_; HeaderMapImplPtr headers_; StreamDecoder* decoder_{}; int32_t stream_id_{-1}; - Buffer::OwnedImpl pending_recv_data_; - Buffer::OwnedImpl pending_send_data_; + uint32_t unconsumed_bytes_{0}; + uint32_t read_disable_count_{0}; + Buffer::WatermarkBuffer pending_recv_data_{ + Buffer::InstancePtr{new Buffer::OwnedImpl}, + [this]() -> void { this->pendingRecvBufferLowWatermark(); }, + [this]() -> void { this->pendingRecvBufferHighWatermark(); }}; + Buffer::WatermarkBuffer pending_send_data_{ + Buffer::InstancePtr{new Buffer::OwnedImpl}, + [this]() -> void { this->pendingSendBufferLowWatermark(); }, + [this]() -> void { this->pendingSendBufferHighWatermark(); }}; HeaderMapPtr pending_trailers_; Optional deferred_reset_; HeaderString cookies_; @@ -161,6 +191,8 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable StreamImplPtr; @@ -199,6 +231,8 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable active_streams_; nghttp2_session* session_{}; CodecStats stats_; + Network::Connection& connection_; + uint32_t per_stream_buffer_limit_; private: virtual ConnectionCallbacks& callbacks() PURE; @@ -213,7 +247,6 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable CONTINUE_HEADER; - Network::Connection& connection_; bool dispatching_ : 1; bool raised_goaway_ : 1; bool pending_deferred_reset_ : 1; @@ -229,6 +262,18 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { // Http::ClientConnection Http::StreamEncoder& newStream(StreamDecoder& response_decoder) override; + // Propogate network connection watermark events to each stream on the connection. + // The router will propogate it downstream. + void onUnderlyingConnectionAboveWriteBufferHighWatermark() override { + for (auto& stream : active_streams_) { + stream->runHighWatermarkCallbacks(); + } + } + void onUnderlyingConnectionBelowWriteBufferLowWatermark() override { + for (auto& stream : active_streams_) { + stream->runLowWatermarkCallbacks(); + } + } private: // ConnectionImpl @@ -236,7 +281,7 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { int onBeginHeaders(const nghttp2_frame* frame) override; int onHeader(const nghttp2_frame* frame, HeaderString&& name, HeaderString&& value) override; - ConnectionCallbacks& callbacks_; + Http::ConnectionCallbacks& callbacks_; }; /** diff --git a/source/common/router/router.h b/source/common/router/router.h index 1477e8f8250e2..f2047eebe878f 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -165,6 +165,16 @@ class Filter : Logger::Loggable, // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; + void onAboveWriteBufferHighWatermark() override { + // Have the connection manager disable reads on the downstream stream. + parent_.cluster_->stats().upstream_flow_control_backed_up_total_.inc(); + parent_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); + } + void onBelowWriteBufferLowWatermark() override { + // Have the connection manager enable reads on the downstream stream. + parent_.cluster_->stats().upstream_flow_control_drained_total_.inc(); + parent_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); + } // Http::ConnectionPool::Callbacks void onPoolFailure(Http::ConnectionPool::PoolFailureReason reason, diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 2d9781ca524df..6a8fc8378067b 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -271,7 +271,7 @@ void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onInterval() { if (!client_) { Upstream::Host::CreateConnectionData conn = host_->createConnection(parent_.dispatcher_); client_.reset(parent_.createCodecClient(conn)); - client_->addConnectionCallbacks(*this); + client_->addConnectionCallbacks(connection_callback_impl_); expect_reset_ = false; } diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index 0736f8a85ffef..b980a8595cec9 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -151,8 +151,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { private: struct HttpActiveHealthCheckSession : public ActiveHealthCheckSession, public Http::StreamDecoder, - public Http::StreamCallbacks, - public Network::ConnectionCallbacks { + public Http::StreamCallbacks { HttpActiveHealthCheckSession(HttpHealthCheckerImpl& parent, HostSharedPtr host); ~HttpActiveHealthCheckSession(); @@ -174,12 +173,24 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; - - // Network::ConnectionCallbacks - void onEvent(Network::ConnectionEvent event) override; void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} + void onEvent(Network::ConnectionEvent event); + + class ConnectionCallbackImpl : public Network::ConnectionCallbacks { + public: + ConnectionCallbackImpl(HttpActiveHealthCheckSession& parent) : parent_(parent) {} + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + private: + HttpActiveHealthCheckSession& parent_; + }; + + ConnectionCallbackImpl connection_callback_impl_{*this}; HttpHealthCheckerImpl& parent_; Http::CodecClientPtr client_; Http::StreamEncoder* request_encoder_{}; diff --git a/source/docs/flow_control.md b/source/docs/flow_control.md new file mode 100644 index 0000000000000..c451bef6eac7a --- /dev/null +++ b/source/docs/flow_control.md @@ -0,0 +1,183 @@ +### Overview + +Flow control in Envoy is done by having limits on each buffer, and watermark callbacks. When a +buffer contains more data than the configured limit, the high watermark callback will fire, kicking +off a chain of events which eventually informs the data source to stop sending data. This back-off +may be immediate (stop reading from a socket) or gradual (stop HTTP/2 window updates) so all +buffer limits in Envoy are considered soft limits. When the buffer eventually drains (generally to +half of of the high watermark to avoid thrashing back and forth) the low watermark callback will +fire, informing the sender it can resume sending data. + +### TCP implementation details + +Flow control for TCP and TCP-with-TLS-termination are handled by coordination +between the `Network::ConnectionImpl` write buffer, and the `Network::TcpProxy` +filter. + +The downstream flow control goes as follows. + + * The downstream `Network::ConnectionImpl::write_buffer_` buffers too much + data. It calls + `Network::ConnectionCallbacks::onAboveWriteBufferHighWatermark()`. + * The `Network::TcpProxy::DownstreamCallbacks` receives + `onAboveWriteBufferHighWatermark()` and calls `readDisable(true)` on the upstream + connection. + * When the downstream buffer is drained, it calls + `Network::ConnectionCallbacks::onBelowWriteBufferLowWatermark()` + * The `Network::TcpProxy::DownstreamCallbacks` receives + `onBelowWriteBufferLowWatermark()` and calls `readDisable(false)` on the upstream + connection. + +Flow control for the upstream path is much the same. + + * The upstream `Network::ConnectionImpl::write_buffer_` buffers too much + data. It calls + `Network::ConnectionCallbacks::onAboveWriteBufferHighWatermark()`. + * The Network::TcpProxy::UpstreamCallbacks receives + `onAboveWriteBufferHighWatermark()` and calls `readDisable(true)` on the downstream + connection. + * When the upstream buffer is drained, it calls + `Network::ConnectionCallbacks::onBelowWriteBufferLowWatermark()` + * The `Network::TcpProxy::UpstreamCallbacks` receives + `onBelowWriteBufferLowWatermark()` and calls `readDisable(false)` on the downstream + connection. + +### HTTP2 implementation details + +Because the various buffers in the HTTP/2 stack are fairly complicated, each path from a buffer +going over the watermark limit to disabling data from the data source is documented separately. + +![HTTP2 data flow diagram](h2_buffers.png) + +For HTTP/2, when filters, streams, or connections back up, the end result is `readDisable(true)` +being called on the source stream. This results in the stream ceasing to consume window, and so +not sending further flow control window updates to the peer. This will result in the peer +eventually stopping sending data when the available window is consumed (or nghttp2 closing the +connection if the peer violates the flow control limit) and so limiting the amount of data Envoy +will buffer for each stream. When `readDisable(false)` is called, any outstanding unconsumed data +is immediately consumed, which results in resuming window updates to the peer and the resumption of +data. + +Note that `readDisable(true)` on a stream may be called by multiple entities. It is called when any +filter buffers too much, when the stream backs up and has too much data buffered, or the +connection has too much data buffered. Because of this, `readDisable()` maintains a count of +the number of times it has been called to both enable and disable the stream, resuming reads when +each caller has called the equivalent low watermark callback. For example, if +the TCP window upstream fills up and results in the network buffer backing up, +all the streams associated with that connection will `readDisable(true)` their +downstream data sources. When the HTTP/2 flow control window fills up an +individual stream may use all of the window available and call a second +`readDisable(true)` on its downstream data source. When the upstream TCP socket drains, +the connection will go below its low watermark and each stream will call +`readDisable(false)` to resume the flow of data. The stream which had both a +network level block and a H2 flow control block will still not be fully enabled. +Once the upstream peer sends window updates, the stream buffer will drain and +the second `readDisable(false)` will be called on the downstream data source, +which will finally result in data flowing from downstream again. + +The two main parties involved in flow control are the router filter (`Envoy::Router::Filter`) and +the connection manager (`Envoy::Http::ConnectionManagerImpl`). The router is +responsible for intercepting watermark events for its own buffers, the individual upstream streams +(if codec buffers fill up) and the upstream connection (if the network buffer fills up). It passes +any events to the connection manager, which has the ability to call `readDisable()` to enable and +disable further data from downstream. + +TODO(alyssawilk) document the reverse path. + +## HTTP/2 codec recv buffer + +Given the HTTP/2 `Envoy::Http::Http2::ConnectionImpl::StreamImpl::pending_recv_data_` is processed immediately +there's no real need for buffer limits, but for consistency and to future-proof the implementation, +it is a WatermarkBuffer. The high watermark path goes as follows: + + * When `pending_recv_data_` has too much data it calls + `ConnectionImpl::StreamImpl::pendingRecvBufferHighWatermark()`. + * `pendingRecvBufferHighWatermark` calls `readDisable(true)` on the stream. + +The low watermark path is similar + + * When `pending_recv_data_` is drained, it calls + `ConnectionImpl::StreamImpl::pendingRecvBufferLowWatermark`. + * `pendingRecvBufferLowWatermarkwhich` calls `readDisable(false)` on the stream. + +## HTTP/2 filters + +TODO(alyssawilk) implement and document. + +# HTTP/2 codec upstream send buffer + +The upstream send buffer `Envoy::Http::Http2::ConnectionImpl::StreamImpl::pending_send_data_` is +H2 stream data destined for an Envoy backend. Data is added to this buffer after each filter in +the chain is done processing, and it backs up if there is insufficient connection or stream window +to send the data. The high watermark path goes as follows: + + * When `pending_send_data_` has too much data it calls + `ConnectionImpl::StreamImpl::pendingSendBufferHighWatermark()`. + * `pendingSendBufferHighWatermark()` calls `StreamCallbackHelper::runHighWatermarkCallbacks()` + * `runHighWatermarkCallbacks()` results in all subscribers of `Envoy::Http::StreamCallbacks` + receiving an `onAboveWriteBufferHighWatermark()` callback. + * When `Envoy::Router::Filter` receives `onAboveWriteBufferHighWatermark()` it + calls `StreamDecoderFilterCallback::onDecoderFilterAboveWriteBufferHighWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl` receives + `onDecoderFilterAboveWriteBufferHighWatermark()` it calls `readDisable(true)` on the downstream + stream to pause data. + +For the low watermark path: + + * When `pending_send_data_` drains it calls + `ConnectionImpl::StreamImpl::pendingSendBufferLowWatermark()` + * `pendingSendBufferLowWatermark()` calls `StreamCallbackHelper::runLowWatermarkCallbacks()` + * `runLowWatermarkCallbacks()` results in all subscribers of `Envoy::Http::StreamCallbacks` + receiving a `onBelowWriteBufferLowWatermark()` callback. + * When `Envoy::Router::Filter` receives `onBelowWriteBufferLowWatermark()` it + calls `StreamDecoderFilterCallback::onDecoderFilterBelowWriteBufferLowWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl` receives `onDecoderFilterBelowWriteBufferLowWatermark()` + it calls `readDisable(false)` on the downstream stream to resume data. + +# HTTP/2 network upstream network buffer + +The upstream network buffer is HTTP/2 data for all streams destined for the +Envoy backend. If the network buffer fills up, all streams associated with the +underlying TCP connection will be informed of the back-up, and the data sources +(HTTP/2 streams or HTTP connections) feeding into those streams will be +readDisabled. + +The high watermark path is as follows: + + * When `Envoy::Network::ConnectionImpl::write_buffer_` has too much data it calls + `Network::ConnectionCallbacks::onAboveWriteBufferHighWatermark()`. + * When `Envoy::Http::CodecClient` receives `onAboveWriteBufferHighWatermark()` it + calls `onUnderlyingConnectionAboveWriteBufferHighWatermark()` on `codec_`. + * When `Envoy::Http::Http2::ConnectionImpl` receives `onAboveWriteBufferHighWatermark()` it calls + `runHighWatermarkCallbacks()` for each stream of the connection. + * `runHighWatermarkCallbacks()` results in all subscribers of `Envoy::Http::StreamCallback` + receiving an `onAboveWriteBufferHighWatermark()` callback. + * When `Envoy::Router::Filter` receives `onAboveWriteBufferHighWatermark()` it + calls `StreamDecoderFilterCallback::onDecoderFilterAboveWriteBufferHighWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl` receives + `onDecoderFilterAboveWriteBufferHighWatermark()` it calls `readDisable(true)` on the downstream + stream to pause data. + +The low watermark path is as follows: + + * When `Envoy::Network::ConnectionImpl::write_buffer_` is drained it calls + `Network::ConnectionCallbacks::onBelowWriteBufferLowWatermark()`. + * When `Envoy::Http::CodecClient` receives `onBelowWriteBufferLowWatermark()` it + calls `onUnderlyingConnectionBelowWriteBufferLowWatermark()` on `codec_`. + * When `Envoy::Http::Http2::ConnectionImpl` receives `onBelowWriteBufferLowWatermark()` it calls + `runLowWatermarkCallbacks()` for each stream of the connection. + * `runLowWatermarkCallbacks()` results in all subscribers of `Envoy::Http::StreamCallback` + receiving a `onBelowWriteBufferLowWatermark()` callback. + * When `Envoy::Router::Filter` receives `onBelowWriteBufferLowWatermark()` it + calls `StreamDecoderFilterCallback::onDecoderFilterBelowWriteBufferLowWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl` receives `onDecoderFilterBelowWriteBufferLowWatermark()` + it calls `readDisable(false)` on the downstream stream to resume data. + +# HTTP/2 codec downstream send buffer +# HTTP/2 network upstream network buffer + +TODO(alyssawilk) implement and document. + +### HTTP implementation details + +TODO(alyssawilk) implement and document. diff --git a/source/docs/h2_buffers.png b/source/docs/h2_buffers.png new file mode 100644 index 0000000000000..b5c22a58a7317 Binary files /dev/null and b/source/docs/h2_buffers.png differ diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 8cccd18e5c640..a57157bc775ad 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -831,5 +831,18 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) { .value()); } +TEST_F(AsyncClientImplTest, WatermarkCallbacks) { + TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + AsyncClient::Stream* stream = + client_.start(stream_callbacks_, Optional()); + stream->sendHeaders(headers, false); + Http::StreamDecoderFilterCallbacks* filter_callbacks = + static_cast(stream); + filter_callbacks->onDecoderFilterAboveWriteBufferHighWatermark(); + filter_callbacks->onDecoderFilterBelowWriteBufferLowWatermark(); + EXPECT_CALL(stream_callbacks_, onReset()); +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/codec_client_test.cc b/test/common/http/codec_client_test.cc index 0969f61056c8e..0d0121c05dff5 100644 --- a/test/common/http/codec_client_test.cc +++ b/test/common/http/codec_client_test.cc @@ -158,5 +158,13 @@ TEST_F(CodecClientTest, PrematureResponse) { EXPECT_EQ(1U, cluster_->stats_.upstream_cx_protocol_error_.value()); } +TEST_F(CodecClientTest, WatermarkPassthrough) { + EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark()); + connection_cb_->onAboveWriteBufferHighWatermark(); + + EXPECT_CALL(*codec_, onUnderlyingConnectionBelowWriteBufferLowWatermark()); + connection_cb_->onBelowWriteBufferLowWatermark(); +} + } // namespace Http } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index dcfb98adeab84..ae83fdc1921fd 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -974,6 +974,47 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInline) { HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, true); } +TEST_F(HttpConnectionManagerImplTest, WatermarkCallbacks) { + InSequence s; + setup(false, ""); + + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); + HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}}; + decoder->decodeHeaders(std::move(headers), true); + })); + + setupFilterChain(2, 2); + + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) + .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { + Buffer::OwnedImpl data("hello"); + decoder_filters_[0]->callbacks_->addDecodedData(data); + return FilterHeadersStatus::Continue; + })); + EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + EXPECT_CALL(*decoder_filters_[1], decodeData(_, true)) + .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input); + + MockStream stream; + EXPECT_CALL(response_encoder_, getStream()).Times(1).WillOnce(ReturnRef(stream)); + EXPECT_CALL(stream, readDisable(true)); + ASSERT(decoder_filters_[0]->callbacks_ != nullptr); + decoder_filters_[0]->callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); + EXPECT_EQ(1U, stats_.named_.downstream_flow_control_paused_reading_total_.value()); + + EXPECT_CALL(response_encoder_, getStream()).Times(1).WillOnce(ReturnRef(stream)); + EXPECT_CALL(stream, readDisable(false)); + ASSERT(decoder_filters_[0]->callbacks_ != nullptr); + decoder_filters_[0]->callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); + EXPECT_EQ(1U, stats_.named_.downstream_flow_control_resumed_reading_total_.value()); +} + TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { InSequence s; setup(false, ""); diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 9692631885eb9..339fde616f7fd 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -1,6 +1,8 @@ #include #include +#include "envoy/http/codec.h" + #include "common/http/exception.h" #include "common/http/header_map_impl.h" #include "common/http/http2/codec_impl.h" @@ -15,11 +17,13 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::AnyNumber; using testing::AtLeast; using testing::InSequence; using testing::Invoke; using testing::InvokeWithoutArgs; using testing::NiceMock; +using testing::Return; using testing::_; namespace Envoy { @@ -40,6 +44,24 @@ Http2Settings Http2SettingsFromTuple(const Http2SettingsTuple& tp) { } } // namespace +class TestServerConnectionImpl : public ServerConnectionImpl { +public: + TestServerConnectionImpl(Network::Connection& connection, ServerConnectionCallbacks& callbacks, + Stats::Scope& scope, const Http2Settings& http2_settings) + : ServerConnectionImpl(connection, callbacks, scope, http2_settings) {} + nghttp2_session* session() { return session_; } + using ServerConnectionImpl::getStream; +}; + +class TestClientConnectionImpl : public ClientConnectionImpl { +public: + TestClientConnectionImpl(Network::Connection& connection, Http::ConnectionCallbacks& callbacks, + Stats::Scope& scope, const Http2Settings& http2_settings) + : ClientConnectionImpl(connection, callbacks, scope, http2_settings) {} + nghttp2_session* session() { return session_; } + using ClientConnectionImpl::getStream; +}; + class Http2CodecImplTest : public testing::TestWithParam { public: struct ConnectionWrapper { @@ -62,8 +84,10 @@ class Http2CodecImplTest : public testing::TestWithParam : client_http2settings_(Http2SettingsFromTuple(::testing::get<0>(GetParam()))), client_(client_connection_, client_callbacks_, stats_store_, client_http2settings_), server_http2settings_(Http2SettingsFromTuple(::testing::get<1>(GetParam()))), - server_(server_connection_, server_callbacks_, stats_store_, server_http2settings_), - request_encoder_(client_.newStream(response_decoder_)) { + server_(server_connection_, server_callbacks_, stats_store_, server_http2settings_) {} + + void initialize() { + request_encoder_ = &client_.newStream(response_decoder_); setupDefaultConnectionMocks(); EXPECT_CALL(server_callbacks_, newStream(_)) @@ -87,21 +111,23 @@ class Http2CodecImplTest : public testing::TestWithParam const Http2Settings client_http2settings_; NiceMock client_connection_; MockConnectionCallbacks client_callbacks_; - ClientConnectionImpl client_; + TestClientConnectionImpl client_; ConnectionWrapper client_wrapper_; const Http2Settings server_http2settings_; NiceMock server_connection_; MockServerConnectionCallbacks server_callbacks_; - ServerConnectionImpl server_; + TestServerConnectionImpl server_; ConnectionWrapper server_wrapper_; MockStreamDecoder response_decoder_; - StreamEncoder& request_encoder_; + StreamEncoder* request_encoder_; MockStreamDecoder request_decoder_; StreamEncoder* response_encoder_{}; MockStreamCallbacks server_stream_callbacks_; }; TEST_P(Http2CodecImplTest, ExpectContinueHeadersOnlyResponse) { + initialize(); + TestHeaderMapImpl request_headers; request_headers.addViaCopy("expect", "100-continue"); HttpTestUtility::addDefaultHeaders(request_headers); @@ -111,11 +137,11 @@ TEST_P(Http2CodecImplTest, ExpectContinueHeadersOnlyResponse) { TestHeaderMapImpl continue_headers{{":status", "100"}}; EXPECT_CALL(response_decoder_, decodeHeaders_(HeaderMapEqual(&continue_headers), false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder_, decodeData(_, true)); Buffer::OwnedImpl hello("hello"); - request_encoder_.encodeData(hello, true); + request_encoder_->encodeData(hello, true); TestHeaderMapImpl response_headers{{":status", "200"}}; EXPECT_CALL(response_decoder_, decodeHeaders_(HeaderMapEqual(&response_headers), true)); @@ -123,6 +149,8 @@ TEST_P(Http2CodecImplTest, ExpectContinueHeadersOnlyResponse) { } TEST_P(Http2CodecImplTest, ExpectContinueTrailersResponse) { + initialize(); + TestHeaderMapImpl request_headers; request_headers.addViaCopy("expect", "100-continue"); HttpTestUtility::addDefaultHeaders(request_headers); @@ -130,11 +158,11 @@ TEST_P(Http2CodecImplTest, ExpectContinueTrailersResponse) { TestHeaderMapImpl continue_headers{{":status", "100"}}; EXPECT_CALL(response_decoder_, decodeHeaders_(HeaderMapEqual(&continue_headers), false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder_, decodeData(_, true)); Buffer::OwnedImpl hello("hello"); - request_encoder_.encodeData(hello, true); + request_encoder_->encodeData(hello, true); TestHeaderMapImpl response_headers{{":status", "200"}}; EXPECT_CALL(response_decoder_, decodeHeaders_(HeaderMapEqual(&response_headers), false)); @@ -146,10 +174,12 @@ TEST_P(Http2CodecImplTest, ExpectContinueTrailersResponse) { } TEST_P(Http2CodecImplTest, ShutdownNotice) { + initialize(); + TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); EXPECT_CALL(request_decoder_, decodeHeaders_(_, true)); - request_encoder_.encodeHeaders(request_headers, true); + request_encoder_->encodeHeaders(request_headers, true); EXPECT_CALL(client_callbacks_, onGoAway()); server_.shutdownNotice(); @@ -161,36 +191,42 @@ TEST_P(Http2CodecImplTest, ShutdownNotice) { } TEST_P(Http2CodecImplTest, RefusedStreamReset) { + initialize(); + TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); MockStreamCallbacks callbacks; - request_encoder_.getStream().addCallbacks(callbacks); + request_encoder_->getStream().addCallbacks(callbacks); EXPECT_CALL(server_stream_callbacks_, onResetStream(StreamResetReason::LocalRefusedStreamReset)); EXPECT_CALL(callbacks, onResetStream(StreamResetReason::RemoteRefusedStreamReset)); response_encoder_->getStream().resetStream(StreamResetReason::LocalRefusedStreamReset); } TEST_P(Http2CodecImplTest, InvalidFrame) { + initialize(); + ON_CALL(client_connection_, write(_)).WillByDefault(Invoke([&](Buffer::Instance& data) -> void { server_wrapper_.buffer_.add(data); })); - request_encoder_.encodeHeaders(TestHeaderMapImpl{}, true); + request_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); EXPECT_THROW(server_wrapper_.dispatch(Buffer::OwnedImpl(), server_), CodecProtocolException); } TEST_P(Http2CodecImplTest, TrailingHeaders) { + initialize(); + TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder_, decodeData(_, false)); Buffer::OwnedImpl hello("hello"); - request_encoder_.encodeData(hello, false); + request_encoder_->encodeData(hello, false); EXPECT_CALL(request_decoder_, decodeTrailers_(_)); - request_encoder_.encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); + request_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); TestHeaderMapImpl response_headers{{":status", "200"}}; EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); @@ -203,6 +239,8 @@ TEST_P(Http2CodecImplTest, TrailingHeaders) { } TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { + initialize(); + // Buffer server data so we can make sure we don't get any window updates. ON_CALL(client_connection_, write(_)).WillByDefault(Invoke([&](Buffer::Instance& data) -> void { server_wrapper_.buffer_.add(data); @@ -211,12 +249,12 @@ TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AtLeast(1)); Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); - request_encoder_.encodeData(body, false); + request_encoder_->encodeData(body, false); EXPECT_CALL(request_decoder_, decodeTrailers_(_)); - request_encoder_.encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); + request_encoder_->encodeTrailers(TestHeaderMapImpl{{"trailing", "header"}}); // Flush pending data. setupDefaultConnectionMocks(); @@ -235,10 +273,12 @@ TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { class Http2CodecImplDeferredResetTest : public Http2CodecImplTest {}; TEST_P(Http2CodecImplDeferredResetTest, DeferredResetClient) { + initialize(); + InSequence s; MockStreamCallbacks client_stream_callbacks; - request_encoder_.getStream().addCallbacks(client_stream_callbacks); + request_encoder_->getStream().addCallbacks(client_stream_callbacks); // Do a request, but pause server dispatch so we don't send window updates. This will result in a // deferred reset, followed by a pending frames flush which will cause the stream to actually @@ -248,11 +288,12 @@ TEST_P(Http2CodecImplDeferredResetTest, DeferredResetClient) { })); TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); - request_encoder_.encodeData(body, true); + EXPECT_CALL(client_stream_callbacks, onAboveWriteBufferHighWatermark()).Times(AnyNumber()); + request_encoder_->encodeData(body, true); EXPECT_CALL(client_stream_callbacks, onResetStream(StreamResetReason::LocalReset)); - request_encoder_.getStream().resetStream(StreamResetReason::LocalReset); + request_encoder_->getStream().resetStream(StreamResetReason::LocalReset); // Dispatch server. We expect to see some data. EXPECT_CALL(response_decoder_, decodeHeaders_(_, _)).Times(0); @@ -271,12 +312,14 @@ TEST_P(Http2CodecImplDeferredResetTest, DeferredResetClient) { } TEST_P(Http2CodecImplDeferredResetTest, DeferredResetServer) { + initialize(); + InSequence s; TestHeaderMapImpl request_headers; HttpTestUtility::addDefaultHeaders(request_headers); EXPECT_CALL(request_decoder_, decodeHeaders_(_, false)); - request_encoder_.encodeHeaders(request_headers, false); + request_encoder_->encodeHeaders(request_headers, false); // In this case we do the same thing as DeferredResetClient but on the server side. ON_CALL(server_connection_, write(_)).WillByDefault(Invoke([&](Buffer::Instance& data) -> void { @@ -285,12 +328,13 @@ TEST_P(Http2CodecImplDeferredResetTest, DeferredResetServer) { TestHeaderMapImpl response_headers{{":status", "200"}}; response_encoder_->encodeHeaders(response_headers, false); Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + EXPECT_CALL(server_stream_callbacks_, onAboveWriteBufferHighWatermark()).Times(AnyNumber()); response_encoder_->encodeData(body, true); EXPECT_CALL(server_stream_callbacks_, onResetStream(StreamResetReason::LocalReset)); response_encoder_->getStream().resetStream(StreamResetReason::LocalReset); MockStreamCallbacks client_stream_callbacks; - request_encoder_.getStream().addCallbacks(client_stream_callbacks); + request_encoder_->getStream().addCallbacks(client_stream_callbacks); EXPECT_CALL(response_decoder_, decodeHeaders_(_, false)); EXPECT_CALL(response_decoder_, decodeData(_, false)).Times(AtLeast(1)); EXPECT_CALL(client_stream_callbacks, onResetStream(StreamResetReason::RemoteReset)); @@ -298,16 +342,131 @@ TEST_P(Http2CodecImplDeferredResetTest, DeferredResetServer) { client_wrapper_.dispatch(Buffer::OwnedImpl(), client_); } -// Deferred reset tests use only small windows so that we can test certain conditions. -#define HTTP2SETTINGS_DEFERRED_RESET_COMBINE \ +class Http2CodecImplFlowControlTest : public Http2CodecImplTest {}; + +// Back up the pending_sent_data_ buffer in the client connection and make sure the watermarks fire +// as expected. +// +// This also tests the readDisable logic in StreamImpl, verifying that h2 bytes are consumed +// when the stream has readDisable(true) called. +TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { + initialize(); + MockStreamCallbacks callbacks; + request_encoder_->getStream().addCallbacks(callbacks); + + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + TestHeaderMapImpl expected_headers; + HttpTestUtility::addDefaultHeaders(expected_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), false)); + request_encoder_->encodeHeaders(request_headers, false); + + // Force the server stream to be read disabled. This will cause it to stop sending window + // updates to the client. + server_.getStream(1)->readDisable(true); + + uint32_t initial_stream_window = + nghttp2_session_get_stream_effective_local_window_size(client_.session(), 1); + // If this limit is changed, this test will fail due to the initial large writes being divided + // into more than 4 frames. Fast fail here with this explanatory comment. + ASSERT_EQ(65535, initial_stream_window); + // One large write gets broken into smaller frames. + EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber()); + Buffer::OwnedImpl long_data(std::string(initial_stream_window, 'a')); + request_encoder_->encodeData(long_data, false); + + // Verify that the window is full. The client will not send more data to the server for this + // stream. + EXPECT_EQ(0, nghttp2_session_get_stream_local_window_size(server_.session(), 1)); + EXPECT_EQ(0, nghttp2_session_get_stream_remote_window_size(client_.session(), 1)); + EXPECT_EQ(initial_stream_window, server_.getStream(1)->unconsumed_bytes_); + + // Now that the flow control window is full, further data causes the send buffer to back up. + Buffer::OwnedImpl more_long_data(std::string(initial_stream_window, 'a')); + request_encoder_->encodeData(more_long_data, false); + EXPECT_EQ(initial_stream_window, client_.getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window, server_.getStream(1)->unconsumed_bytes_); + + // If we go over the limit, the stream callbacks should fire. + EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); + Buffer::OwnedImpl last_byte("!"); + request_encoder_->encodeData(last_byte, false); + EXPECT_EQ(initial_stream_window + 1, client_.getStream(1)->pending_send_data_.length()); + + // Now unblock the server's stream. This will cause the bytes to be consumed, flow control + // updates to be sent, and the client to flush all queued data. + EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); + server_.getStream(1)->readDisable(false); + EXPECT_EQ(0, client_.getStream(1)->pending_send_data_.length()); + // The extra 1 byte sent won't trigger another window update, so the final window should be the + // initial window minus the last 1 byte flush from the client to server. + EXPECT_EQ(initial_stream_window - 1, + nghttp2_session_get_stream_local_window_size(server_.session(), 1)); + EXPECT_EQ(initial_stream_window - 1, + nghttp2_session_get_stream_remote_window_size(client_.session(), 1)); +} + +// Set up the same asTestFlowControlInPendingSendData, but tears the stream down with an early reset +// once the flow control window is full up. +TEST_P(Http2CodecImplFlowControlTest, EarlyResetRestoresWindow) { + initialize(); + MockStreamCallbacks callbacks; + request_encoder_->getStream().addCallbacks(callbacks); + + TestHeaderMapImpl request_headers; + HttpTestUtility::addDefaultHeaders(request_headers); + TestHeaderMapImpl expected_headers; + HttpTestUtility::addDefaultHeaders(expected_headers); + EXPECT_CALL(request_decoder_, decodeHeaders_(HeaderMapEqual(&expected_headers), false)); + request_encoder_->encodeHeaders(request_headers, false); + + // Force the server stream to be read disabled. This will cause it to stop sending window + // updates to the client. + server_.getStream(1)->readDisable(true); + + uint32_t initial_stream_window = + nghttp2_session_get_stream_effective_local_window_size(client_.session(), 1); + uint32_t initial_connection_window = nghttp2_session_get_remote_window_size(client_.session()); + // If this limit is changed, this test will fail due to the initial large writes being divided + // into more than 4 frames. Fast fail here with this explanatory comment. + ASSERT_EQ(65535, initial_stream_window); + // One large write may get broken into smaller frames. + EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber()); + Buffer::OwnedImpl long_data(std::string(initial_stream_window, 'a')); + // The one giant write will cause the buffer to go over the limit, then drain and go back under + // the limit. + request_encoder_->encodeData(long_data, false); + + // Verify that the window is full. The client will not send more data to the server for this + // stream. + EXPECT_EQ(0, nghttp2_session_get_stream_local_window_size(server_.session(), 1)); + EXPECT_EQ(0, nghttp2_session_get_stream_remote_window_size(client_.session(), 1)); + EXPECT_EQ(initial_stream_window, server_.getStream(1)->unconsumed_bytes_); + EXPECT_GT(initial_connection_window, nghttp2_session_get_remote_window_size(client_.session())); + + EXPECT_CALL(server_stream_callbacks_, onResetStream(StreamResetReason::LocalRefusedStreamReset)); + EXPECT_CALL(callbacks, onResetStream(StreamResetReason::RemoteRefusedStreamReset)); + response_encoder_->getStream().resetStream(StreamResetReason::LocalRefusedStreamReset); + + // Regression test that the window is consumed even if the stream is destroyed early. + EXPECT_EQ(initial_connection_window, nghttp2_session_get_remote_window_size(client_.session())); +} + +#define HTTP2SETTINGS_SMALL_WINDOW_COMBINE \ ::testing::Combine(::testing::Values(Http2Settings::DEFAULT_HPACK_TABLE_SIZE), \ ::testing::Values(Http2Settings::DEFAULT_MAX_CONCURRENT_STREAMS), \ ::testing::Values(Http2Settings::MIN_INITIAL_STREAM_WINDOW_SIZE), \ ::testing::Values(Http2Settings::MIN_INITIAL_CONNECTION_WINDOW_SIZE)) +// Deferred reset tests use only small windows so that we can test certain conditions. INSTANTIATE_TEST_CASE_P(Http2CodecImplDeferredResetTest, Http2CodecImplDeferredResetTest, - ::testing::Combine(HTTP2SETTINGS_DEFERRED_RESET_COMBINE, - HTTP2SETTINGS_DEFERRED_RESET_COMBINE)); + ::testing::Combine(HTTP2SETTINGS_SMALL_WINDOW_COMBINE, + HTTP2SETTINGS_SMALL_WINDOW_COMBINE)); + +// Flow control tests only use only small windows so that we can test certain conditions. +INSTANTIATE_TEST_CASE_P(Http2CodecImplFlowControlTest, Http2CodecImplFlowControlTest, + ::testing::Combine(HTTP2SETTINGS_SMALL_WINDOW_COMBINE, + HTTP2SETTINGS_SMALL_WINDOW_COMBINE)); // we seperate default/edge cases here to avoid combinatorial explosion #define HTTP2SETTINGS_DEFAULT_COMBINE \ diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 19a0e18d93956..e1afe7bd91736 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -1141,5 +1141,47 @@ TEST_F(RouterTest, AutoHostRewriteDisabled) { router_.decodeHeaders(incoming_headers, true); } +TEST_F(RouterTest, Watermarks) { + EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) + .WillOnce(Return(std::chrono::milliseconds(0))); + EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); + + NiceMock encoder; + NiceMock stream; + Http::StreamCallbacks* stream_callbacks; + EXPECT_CALL(stream, addCallbacks(_)).WillOnce(Invoke([&](Http::StreamCallbacks& callbacks) { + stream_callbacks = &callbacks; + })); + EXPECT_CALL(encoder, getStream()).WillOnce(ReturnRef(stream)); + Http::StreamDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks) + -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder, cm_.conn_pool_.host_); + return nullptr; + })); + + Http::TestHeaderMapImpl headers{{"x-envoy-upstream-alt-stat-name", "alt_stat"}, + {"x-envoy-internal", "true"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, true); + + stream_callbacks->onAboveWriteBufferHighWatermark(); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_backed_up_total") + .value()); + stream_callbacks->onBelowWriteBufferLowWatermark(); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_drained_total") + .value()); + + Http::HeaderMapPtr response_headers( + new Http::TestHeaderMapImpl{{":status", "200"}, + {"x-envoy-upstream-canary", "false"}, + {"x-envoy-virtual-cluster", "hello"}}); + response_decoder->decodeHeaders(std::move(response_headers), true); +} + } // namespace Router } // namespace Envoy diff --git a/test/config/integration/server_http2.json b/test/config/integration/server_http2.json index fd3846bbcedfa..6d1ce16ed95d1 100644 --- a/test/config/integration/server_http2.json +++ b/test/config/integration/server_http2.json @@ -174,23 +174,40 @@ } }] }, - { + { "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, "filters": [ - { "type": "read", "name": - "tcp_proxy", - "config": { - "stat_prefix": "test_tcp", - "route_config": { - "routes": [ - { - "cluster": "cluster_1" - } - ] - } - } + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http2", + "http2_settings": { + "per_stream_buffer_limit": 1024 + }, + "drain_timeout_ms": 5000, + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/test/long/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "decoder", "name": "router", "config": {}} + ] } - ] + }] }], "admin": { "access_log_path": "/dev/null", "address": "tcp://{{ ip_loopback_address }}:0" }, @@ -220,6 +237,14 @@ "dns_lookup_family": "{{ dns_lookup_family }}", "hosts": [{"url": "tcp://localhost:{{ upstream_1 }}"}] }, + { + "name": "cluster_3", + "per_connection_buffer_limit_bytes": 1024, + "connect_timeout_ms": 5000, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://{{ ip_loopback_address }}:{{ upstream_0 }}"}] + }, { "name": "statsd", "connect_timeout_ms": 5000, diff --git a/test/config/integration/server_http2_upstream.json b/test/config/integration/server_http2_upstream.json index 2778ef91f9b2d..f1654de28f2eb 100644 --- a/test/config/integration/server_http2_upstream.json +++ b/test/config/integration/server_http2_upstream.json @@ -253,6 +253,41 @@ ] } }] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http2", + "http2_settings": { + "per_stream_buffer_limit": 1024 + }, + "drain_timeout_ms": 5000, + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/test/long/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "decoder", "name": "router", "config": {} } + ] + } + }] }], "admin": { "access_log_path": "/dev/null", "address": "tcp://{{ ip_loopback_address }}:0" }, @@ -275,6 +310,15 @@ "features": "http2", "dns_lookup_family": "{{ dns_lookup_family }}", "hosts": [{"url": "tcp://localhost:{{ upstream_1 }}"}] + }, + { + "name": "cluster_3", + "per_connection_buffer_limit_bytes": 1024, + "connect_timeout_ms": 5000, + "type": "static", + "lb_type": "round_robin", + "features": "http2", + "hosts": [{"url": "tcp://{{ ip_loopback_address }}:{{ upstream_0 }}"}] }] } } diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 08393c0de52da..09952fa862141 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -55,6 +55,8 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: FakeHttpConnection& parent_; diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 4ca47f20b0b85..0752d2ddf8cdd 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -59,6 +59,12 @@ TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithGiantBodyBuffer) { false); } +TEST_P(Http2IntegrationTest, FlowControlOnAndGiantBody) { + testRouterRequestAndResponseWithBody(makeClientConnection(lookupPort("http_buffer_limits")), + Http::CodecClient::Type::HTTP2, 1024 * 1024, 1024 * 1024, + false); +} + TEST_P(Http2IntegrationTest, RouterHeaderOnlyRequestAndResponseNoBuffer) { testRouterHeaderOnlyRequestAndResponse(makeClientConnection(lookupPort("http")), Http::CodecClient::Type::HTTP2, true); @@ -186,7 +192,8 @@ TEST_P(Http2IntegrationTest, Trailers) { testTrailers(1024, 2048); } TEST_P(Http2IntegrationTest, TrailersGiantBody) { testTrailers(1024 * 1024, 1024 * 1024); } -TEST_P(Http2IntegrationTest, SimultaneousRequest) { +void Http2IntegrationTest::simultaneousRequest(uint32_t port, int32_t request1_bytes, + int32_t request2_bytes) { IntegrationCodecClientPtr codec_client; FakeHttpConnectionPtr fake_upstream_connection1; FakeHttpConnectionPtr fake_upstream_connection2; @@ -197,9 +204,7 @@ TEST_P(Http2IntegrationTest, SimultaneousRequest) { FakeStreamPtr upstream_request1; FakeStreamPtr upstream_request2; executeActions( - {[&]() -> void { - codec_client = makeHttpConnection(lookupPort("http"), Http::CodecClient::Type::HTTP2); - }, + {[&]() -> void { codec_client = makeHttpConnection(port, Http::CodecClient::Type::HTTP2); }, // Start request 1 [&]() -> void { encoder1 = &codec_client->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, @@ -229,14 +234,14 @@ TEST_P(Http2IntegrationTest, SimultaneousRequest) { // Finish request 1 [&]() -> void { - codec_client->sendData(*encoder1, 1024, true); + codec_client->sendData(*encoder1, request1_bytes, true); }, [&]() -> void { upstream_request1->waitForEndStream(*dispatcher_); }, // Finish request 2 [&]() -> void { - codec_client->sendData(*encoder2, 512, true); + codec_client->sendData(*encoder2, request2_bytes, true); }, [&]() -> void { upstream_request2->waitForEndStream(*dispatcher_); }, @@ -244,31 +249,31 @@ TEST_P(Http2IntegrationTest, SimultaneousRequest) { // Respond request 2 [&]() -> void { upstream_request2->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); - upstream_request2->encodeData(1024, true); + upstream_request2->encodeData(request2_bytes, true); }, [&]() -> void { response2->waitForEndStream(); EXPECT_TRUE(upstream_request2->complete()); - EXPECT_EQ(512U, upstream_request2->bodyLength()); + EXPECT_EQ(request2_bytes, upstream_request2->bodyLength()); EXPECT_TRUE(response2->complete()); EXPECT_STREQ("200", response2->headers().Status()->value().c_str()); - EXPECT_EQ(1024U, response2->body().size()); + EXPECT_EQ(request2_bytes, response2->body().size()); }, // Respond request 1 [&]() -> void { upstream_request1->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); - upstream_request1->encodeData(512, true); + upstream_request1->encodeData(request2_bytes, true); }, [&]() -> void { response1->waitForEndStream(); EXPECT_TRUE(upstream_request1->complete()); - EXPECT_EQ(1024U, upstream_request1->bodyLength()); + EXPECT_EQ(request1_bytes, upstream_request1->bodyLength()); EXPECT_TRUE(response1->complete()); EXPECT_STREQ("200", response1->headers().Status()->value().c_str()); - EXPECT_EQ(512U, response1->body().size()); + EXPECT_EQ(request2_bytes, response1->body().size()); }, // Cleanup both downstream and upstream @@ -278,4 +283,13 @@ TEST_P(Http2IntegrationTest, SimultaneousRequest) { [&]() -> void { fake_upstream_connection2->close(); }, [&]() -> void { fake_upstream_connection2->waitForDisconnect(); }}); } + +TEST_P(Http2IntegrationTest, SimultaneousRequest) { + simultaneousRequest(lookupPort("http"), 1024, 512); +} + +TEST_P(Http2IntegrationTest, SimultaneousRequestWithBufferLimits) { + simultaneousRequest(lookupPort("http_buffer_limits"), 1024 * 32, 1024 * 16); +} + } // namespace Envoy diff --git a/test/integration/http2_integration_test.h b/test/integration/http2_integration_test.h index 7daad5071fbe2..a9c624273539a 100644 --- a/test/integration/http2_integration_test.h +++ b/test/integration/http2_integration_test.h @@ -17,9 +17,12 @@ class Http2IntegrationTest : public BaseIntegrationTest, registerPort("upstream_0", fake_upstreams_.back()->localAddress()->ip()->port()); fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); registerPort("upstream_1", fake_upstreams_.back()->localAddress()->ip()->port()); - createTestServer("test/config/integration/server_http2.json", {"echo", "http", "http_buffer"}); + createTestServer("test/config/integration/server_http2.json", + {"echo", "http", "http_buffer", "http_buffer_limits"}); } + void simultaneousRequest(uint32_t port, int32_t request1_bytes, int32_t request2_bytes); + /** * Destructor for an individual test test. */ diff --git a/test/integration/http2_upstream_integration_test.cc b/test/integration/http2_upstream_integration_test.cc index 289f62cc2f1fe..9572f48ebd9ad 100644 --- a/test/integration/http2_upstream_integration_test.cc +++ b/test/integration/http2_upstream_integration_test.cc @@ -101,16 +101,14 @@ TEST_P(Http2UpstreamIntegrationTest, DownstreamResetBeforeResponseComplete) { TEST_P(Http2UpstreamIntegrationTest, Trailers) { testTrailers(1024, 2048); } -TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreaming) { +void Http2UpstreamIntegrationTest::bidirectionalStreaming(uint32_t port, uint32_t bytes) { IntegrationCodecClientPtr codec_client; FakeHttpConnectionPtr fake_upstream_connection; Http::StreamEncoder* encoder; IntegrationStreamDecoderPtr response(new IntegrationStreamDecoder(*dispatcher_)); FakeStreamPtr upstream_request; executeActions( - {[&]() -> void { - codec_client = makeHttpConnection(lookupPort("http"), Http::CodecClient::Type::HTTP2); - }, + {[&]() -> void { codec_client = makeHttpConnection(port, Http::CodecClient::Type::HTTP2); }, // Start request [&]() -> void { encoder = &codec_client->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, @@ -126,17 +124,17 @@ TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreaming) { // Send some data [&]() -> void { - codec_client->sendData(*encoder, 1024, false); + codec_client->sendData(*encoder, bytes, false); }, - [&]() -> void { upstream_request->waitForData(*dispatcher_, 1024); }, + [&]() -> void { upstream_request->waitForData(*dispatcher_, bytes); }, // Start response [&]() -> void { upstream_request->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); - upstream_request->encodeData(1024, false); + upstream_request->encodeData(bytes, false); }, - [&]() -> void { response->waitForBodyData(1024); }, + [&]() -> void { response->waitForBodyData(bytes); }, // Finish request [&]() -> void { @@ -159,6 +157,14 @@ TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreaming) { EXPECT_TRUE(response->complete()); } +TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreaming) { + bidirectionalStreaming(lookupPort("http"), 1024); +} + +TEST_P(Http2UpstreamIntegrationTest, LargeBidirectionalStreamingWithBufferLimits) { + bidirectionalStreaming(lookupPort("http_with_buffer_limits"), 1024 * 32); +} + TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreamingReset) { IntegrationCodecClientPtr codec_client; FakeHttpConnectionPtr fake_upstream_connection; @@ -215,7 +221,10 @@ TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreamingReset) { EXPECT_FALSE(response->complete()); } -TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { +void Http2UpstreamIntegrationTest::simultaneousRequest(uint32_t port, uint32_t request1_bytes, + uint32_t request2_bytes, + uint32_t response1_bytes, + uint32_t response2_bytes) { IntegrationCodecClientPtr codec_client; FakeHttpConnectionPtr fake_upstream_connection; Http::StreamEncoder* encoder1; @@ -225,9 +234,7 @@ TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { FakeStreamPtr upstream_request1; FakeStreamPtr upstream_request2; executeActions( - {[&]() -> void { - codec_client = makeHttpConnection(lookupPort("http"), Http::CodecClient::Type::HTTP2); - }, + {[&]() -> void { codec_client = makeHttpConnection(port, Http::CodecClient::Type::HTTP2); }, // Start request 1 [&]() -> void { encoder1 = &codec_client->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, @@ -254,14 +261,14 @@ TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { // Finish request 1 [&]() -> void { - codec_client->sendData(*encoder1, 1024, true); + codec_client->sendData(*encoder1, request1_bytes, true); }, [&]() -> void { upstream_request1->waitForEndStream(*dispatcher_); }, // Finish request 2 [&]() -> void { - codec_client->sendData(*encoder2, 512, true); + codec_client->sendData(*encoder2, request2_bytes, true); }, [&]() -> void { upstream_request2->waitForEndStream(*dispatcher_); }, @@ -269,31 +276,31 @@ TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { // Respond request 2 [&]() -> void { upstream_request2->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); - upstream_request2->encodeData(1024, true); + upstream_request2->encodeData(response2_bytes, true); }, [&]() -> void { response2->waitForEndStream(); EXPECT_TRUE(upstream_request2->complete()); - EXPECT_EQ(512U, upstream_request2->bodyLength()); + EXPECT_EQ(request2_bytes, upstream_request2->bodyLength()); EXPECT_TRUE(response2->complete()); EXPECT_STREQ("200", response2->headers().Status()->value().c_str()); - EXPECT_EQ(1024U, response2->body().size()); + EXPECT_EQ(response2_bytes, response2->body().size()); }, // Respond request 1 [&]() -> void { upstream_request1->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); - upstream_request1->encodeData(512, true); + upstream_request1->encodeData(response1_bytes, true); }, [&]() -> void { response1->waitForEndStream(); EXPECT_TRUE(upstream_request1->complete()); - EXPECT_EQ(1024U, upstream_request1->bodyLength()); + EXPECT_EQ(request1_bytes, upstream_request1->bodyLength()); EXPECT_TRUE(response1->complete()); EXPECT_STREQ("200", response1->headers().Status()->value().c_str()); - EXPECT_EQ(512U, response1->body().size()); + EXPECT_EQ(response1_bytes, response1->body().size()); }, // Cleanup both downstream and upstream @@ -301,4 +308,14 @@ TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { [&]() -> void { fake_upstream_connection->close(); }, [&]() -> void { fake_upstream_connection->waitForDisconnect(); }}); } + +TEST_P(Http2UpstreamIntegrationTest, SimultaneousRequest) { + simultaneousRequest(lookupPort("http"), 1024, 512, 1023, 513); +} + +TEST_P(Http2UpstreamIntegrationTest, LargeSimultaneousRequestWithBufferLimits) { + simultaneousRequest(lookupPort("http_with_buffer_limits"), 1024 * 20, 1024 * 14 + 2, + 1024 * 10 + 5, 1024 * 16); +} + } // namespace Envoy diff --git a/test/integration/http2_upstream_integration_test.h b/test/integration/http2_upstream_integration_test.h index fe7ca71b8b86d..a515842473a77 100644 --- a/test/integration/http2_upstream_integration_test.h +++ b/test/integration/http2_upstream_integration_test.h @@ -17,10 +17,16 @@ class Http2UpstreamIntegrationTest : public BaseIntegrationTest, registerPort("upstream_0", fake_upstreams_.back()->localAddress()->ip()->port()); fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); registerPort("upstream_1", fake_upstreams_.back()->localAddress()->ip()->port()); + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); + registerPort("upstream_3", fake_upstreams_.back()->localAddress()->ip()->port()); createTestServer("test/config/integration/server_http2_upstream.json", - {"http", "http_buffer", "http1_buffer"}); + {"http", "http_buffer", "http1_buffer", "http_with_buffer_limits"}); } + void bidirectionalStreaming(uint32_t port, uint32_t bytes); + void simultaneousRequest(uint32_t port, uint32_t request1_bytes, uint32_t request2_bytes, + uint32_t response1_bytes, uint32_t response2_bytes); + /** * Destructor for an individual test. */ diff --git a/test/integration/integration.h b/test/integration/integration.h index b73aa546dbbf4..6eaa0792f9792 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -45,6 +45,8 @@ class IntegrationStreamDecoder : public Http::StreamDecoder, public Http::Stream // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: Event::Dispatcher& dispatcher_; diff --git a/test/integration/utility.h b/test/integration/utility.h index 8e198ba85c410..71ff29e48b7f1 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -34,6 +34,8 @@ class BufferingStreamDecoder : public Http::StreamDecoder, public Http::StreamCa // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} private: void onComplete(); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 5b1786064adfe..efe012aa288bb 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -140,6 +140,8 @@ class MockStreamCallbacks : public StreamCallbacks { // Http::StreamCallbacks MOCK_METHOD1(onResetStream, void(StreamResetReason reason)); + MOCK_METHOD0(onAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onBelowWriteBufferLowWatermark, void()); }; class MockStream : public Stream { @@ -151,6 +153,8 @@ class MockStream : public Stream { MOCK_METHOD1(addCallbacks, void(StreamCallbacks& callbacks)); MOCK_METHOD1(removeCallbacks, void(StreamCallbacks& callbacks)); MOCK_METHOD1(resetStream, void(StreamResetReason reason)); + MOCK_METHOD1(readDisable, void(bool disable)); + MOCK_METHOD2(setWriteBufferWatermarks, void(uint32_t, uint32_t)); std::list callbacks_{}; }; @@ -198,6 +202,8 @@ class MockClientConnection : public ClientConnection { // Http::ClientConnection MOCK_METHOD1(newStream, StreamEncoder&(StreamDecoder& response_decoder)); + MOCK_METHOD0(onUnderlyingConnectionAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onUnderlyingConnectionBelowWriteBufferLowWatermark, void()); }; class MockFilterChainFactory : public FilterChainFactory { @@ -234,6 +240,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(requestInfo, Http::AccessLog::RequestInfo&()); MOCK_METHOD0(activeSpan, Tracing::Span&()); MOCK_METHOD0(downstreamAddress, const std::string&()); + MOCK_METHOD0(onDecoderFilterAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onDecoderFilterBelowWriteBufferLowWatermark, void()); // Http::StreamDecoderFilterCallbacks void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override { @@ -268,6 +276,8 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks, MOCK_METHOD0(requestInfo, Http::AccessLog::RequestInfo&()); MOCK_METHOD0(activeSpan, Tracing::Span&()); MOCK_METHOD0(downstreamAddress, const std::string&()); + MOCK_METHOD0(onEncoderFilterAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onEncoderFilterBelowWriteBufferLowWatermark, void()); // Http::StreamEncoderFilterCallbacks MOCK_METHOD1(addEncodedData, void(Buffer::Instance& data));