From 8d278090d8e2c2aae74d535466e604ffcd7e0bb7 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 22 Sep 2020 08:34:19 -0400 Subject: [PATCH 1/5] tcp: towards pluggable upstreams Signed-off-by: Alyssa Wilk --- source/common/tcp_proxy/tcp_proxy.cc | 95 +++++++++------------------- source/common/tcp_proxy/tcp_proxy.h | 31 +++------ source/common/tcp_proxy/upstream.cc | 94 +++++++++++++++++++++++++++ source/common/tcp_proxy/upstream.h | 88 ++++++++++++++++++++------ 4 files changed, 204 insertions(+), 104 deletions(-) diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index b8e6f4d1905f4..2444e4c5d5629 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -224,7 +224,7 @@ Filter::~Filter() { access_log->log(nullptr, nullptr, nullptr, getStreamInfo()); } - ASSERT(upstream_handle_ == nullptr); + ASSERT(generic_conn_pool_ == nullptr); ASSERT(upstream_ == nullptr); } @@ -429,24 +429,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { bool Filter::maybeTunnel(const std::string& cluster_name) { if (!config_->tunnelingConfig()) { - Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster( - cluster_name, Upstream::ResourcePriority::Default, this); - if (conn_pool) { + generic_conn_pool_ = + std::make_unique(cluster_name, cluster_manager_, this, *upstream_callbacks_); + if (generic_conn_pool_->valid()) { connecting_ = true; connect_attempts_++; - - // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a - // valid connection handle. If newConnection fails inline it may result in attempting to - // select a new host, and a recursive call to initializeUpstreamConnection. In this case the - // first call to newConnection will return null and the inner call will persist. - Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this); - if (handle) { - ASSERT(upstream_handle_.get() == nullptr); - upstream_handle_ = std::make_shared(handle); - } + generic_conn_pool_->newStream(this); // Because we never return open connections to the pool, this either has a handle waiting on // connection completion, or onPoolFailure has been invoked. Either way, stop iteration. return true; + } else { + generic_conn_pool_.reset(); } } else { auto* cluster = cluster_manager_.get(cluster_name); @@ -461,28 +454,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) { "http2_protocol_options on the cluster."); return false; } - Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster( - cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this); - if (conn_pool) { - upstream_ = std::make_unique(*upstream_callbacks_, - config_->tunnelingConfig()->hostname()); - HttpUpstream* http_upstream = static_cast(upstream_.get()); - Http::ConnectionPool::Cancellable* cancellable = - conn_pool->newStream(http_upstream->responseDecoder(), *this); - if (cancellable) { - ASSERT(upstream_handle_.get() == nullptr); - upstream_handle_ = std::make_shared(cancellable); - } + + generic_conn_pool_ = std::make_unique(cluster_name, cluster_manager_, this, + config_->tunnelingConfig()->hostname(), + *upstream_callbacks_); + if (generic_conn_pool_->valid()) { + generic_conn_pool_->newStream(this); return true; + } else { + generic_conn_pool_.reset(); } } return false; } -void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) { - upstream_handle_.reset(); - +void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) { + generic_conn_pool_.reset(); read_callbacks_->upstreamHost(host); getStreamInfo().onUpstreamHostSelected(host); @@ -505,44 +493,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, } } -void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, - const Network::Address::InstanceConstSharedPtr& local_address, - Ssl::ConnectionInfoConstSharedPtr ssl_info) { - upstream_handle_.reset(); +void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info, + std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) { + upstream_ = std::move(upstream); + generic_conn_pool_.reset(); read_callbacks_->upstreamHost(host); getStreamInfo().onUpstreamHostSelected(host); getStreamInfo().setUpstreamLocalAddress(local_address); getStreamInfo().setUpstreamSslConnection(ssl_info); onUpstreamConnection(); read_callbacks_->continueReading(); -} - -void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) { - Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); - - upstream_ = std::make_unique(std::move(conn_data), *upstream_callbacks_); - onPoolReadyBase(host, latched_data->connection().localAddress(), - latched_data->connection().streamInfo().downstreamSslConnection()); - read_callbacks_->connection().streamInfo().setUpstreamFilterState( - latched_data->connection().streamInfo().filterState()); -} - -void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view, - Upstream::HostDescriptionConstSharedPtr host) { - onPoolFailure(failure, host); -} - -void Filter::onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) { - Http::RequestEncoder* latched_encoder = &request_encoder; - HttpUpstream* http_upstream = static_cast(upstream_.get()); - http_upstream->setRequestEncoder(request_encoder, - host->transportSocketFactory().implementsSecureTransport()); - - onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(), - info.downstreamSslConnection()); + if (info) { + read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState()); + } } const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() { @@ -611,12 +577,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { disableIdleTimer(); } } - if (upstream_handle_) { + if (generic_conn_pool_) { if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { // Cancel the conn pool request and close any excess pending requests. - upstream_handle_->cancel(); - upstream_handle_.reset(); + generic_conn_pool_.reset(); } } } diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index c7c91f2c85c2d..7f690450514fa 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -242,9 +242,8 @@ class PerConnectionCluster : public StreamInfo::FilterState::Object { */ class Filter : public Network::ReadFilter, public Upstream::LoadBalancerContextBase, - Tcp::ConnectionPool::Callbacks, - public Http::ConnectionPool::Callbacks, - protected Logger::Loggable { + protected Logger::Loggable, + public GenericConnectionPoolCallbacks { public: Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager); ~Filter() override; @@ -254,23 +253,13 @@ class Filter : public Network::ReadFilter, Network::FilterStatus onNewConnection() override; void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; - // Tcp::ConnectionPool::Callbacks - void onPoolFailure(ConnectionPool::PoolFailureReason reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) override; - - // Http::ConnectionPool::Callbacks, - void onPoolFailure(ConnectionPool::PoolFailureReason reason, - absl::string_view transport_failure_reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) override; - - void onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, - const Network::Address::InstanceConstSharedPtr& local_address, - Ssl::ConnectionInfoConstSharedPtr ssl_info); + // GenericConnectionPoolCallbacks + void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) override; + void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override; // Upstream::LoadBalancerContext const Router::MetadataMatchCriteria* metadataMatchCriteria() override; @@ -375,10 +364,10 @@ class Filter : public Network::ReadFilter, Event::TimerPtr idle_timer_; Event::TimerPtr connection_duration_timer_; - std::shared_ptr upstream_handle_; std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a // read filter. std::unique_ptr upstream_; + std::unique_ptr generic_conn_pool_; RouteConstSharedPtr route_; Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; Network::TransportSocketOptionsSharedPtr transport_socket_options_; diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 451a277e0865f..76a746819b9bd 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -1,5 +1,7 @@ #include "common/tcp_proxy/upstream.h" +#include "envoy/upstream/cluster_manager.h" + #include "common/http/header_map_impl.h" #include "common/http/headers.h" #include "common/http/utility.h" @@ -152,5 +154,97 @@ void HttpUpstream::doneWriting() { } } +TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) + : upstream_callbacks_(upstream_callbacks) { + conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name, + Upstream::ResourcePriority::Default, context); +} + +TcpConnPool::~TcpConnPool() { + if (upstream_handle_ != nullptr) { + upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess); + } +} + +bool TcpConnPool::valid() const { return conn_pool_ != nullptr; } + +void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { + callbacks_ = callbacks; + // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a + // valid connection handle. If newConnection fails inline it may result in attempting to + // select a new host, and a recursive call to initializeUpstreamConnection. In this case the + // first call to newConnection will return null and the inner call will persist. + Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this); + if (handle) { + ASSERT(upstream_handle_ == nullptr); + upstream_handle_ = handle; + } +} + +void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + callbacks_->onGenericPoolFailure(reason, host); +} +void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); + Network::Connection& connection = conn_data->connection(); + + auto upstream = std::make_unique(std::move(conn_data), upstream_callbacks_); + callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host, + latched_data->connection().localAddress(), + latched_data->connection().streamInfo().downstreamSslConnection()); +} + +HttpConnPool::HttpConnPool(const std::string& cluster_name, + Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, std::string hostname, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) + : hostname_(hostname), upstream_callbacks_(upstream_callbacks) { + conn_pool_ = cluster_manager.httpConnPoolForCluster( + cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context); +} + +HttpConnPool::~HttpConnPool() { + if (upstream_handle_ != nullptr) { + upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + } +} + +bool HttpConnPool::valid() const { return conn_pool_ != nullptr; } + +void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { + callbacks_ = callbacks; + upstream_ = std::make_unique(upstream_callbacks_, hostname_); + HttpUpstream* http_upstream = static_cast(upstream_.get()); + Tcp::ConnectionPool::Cancellable* handle = + conn_pool_->newStream(http_upstream->responseDecoder(), *this); + if (handle != nullptr) { + upstream_handle_ = handle; + } +} + +void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, + Upstream::HostDescriptionConstSharedPtr host) { + upstream_handle_ = nullptr; + callbacks_->onGenericPoolFailure(reason, host); +} +void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) { + upstream_handle_ = nullptr; + Http::RequestEncoder* latched_encoder = &request_encoder; + HttpUpstream* http_upstream = static_cast(upstream_.get()); + http_upstream->setRequestEncoder(request_encoder, + host->transportSocketFactory().implementsSecureTransport()); + callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, + latched_encoder->getStream().connectionLocalAddress(), + info.downstreamSslConnection()); +} + } // namespace TcpProxy } // namespace Envoy diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index 8d2a301d71377..950065d88018e 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -3,42 +3,94 @@ #include "envoy/http/conn_pool.h" #include "envoy/network/connection.h" #include "envoy/tcp/conn_pool.h" +#include "envoy/upstream/load_balancer.h" #include "envoy/upstream/upstream.h" namespace Envoy { namespace TcpProxy { -// Interface for a generic ConnectionHandle, which can wrap a TcpConnectionHandle -// or an HttpConnectionHandle -class ConnectionHandle { +class GenericConnectionPoolCallbacks; +class GenericUpstream; + +// An API for wrapping either an HTTP or a TCP connection pool. +class GenericConnPool : public Logger::Loggable { public: - virtual ~ConnectionHandle() = default; - // Cancel the conn pool request and close any excess pending requests. - virtual void cancel() PURE; + virtual ~GenericConnPool() = default; + + // Called to create a new HTTP stream or TCP connection. The implementation + // is then responsible for calling either onPoolReady or onPoolFailure on the + // supplied GenericConnectionPoolCallbacks. + virtual void newStream(GenericConnectionPoolCallbacks* callbacks) PURE; + //// Returns the host for this conn pool. + // virtual Upstream::HostDescriptionConstSharedPtr host() const PURE; + virtual bool valid() const PURE; }; -// An implementation of ConnectionHandle which works with the Tcp::ConnectionPool. -class TcpConnectionHandle : public ConnectionHandle { +class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks { public: - TcpConnectionHandle(Tcp::ConnectionPool::Cancellable* handle) : upstream_handle_(handle) {} + TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); + ~TcpConnPool() override; + + // GenericConnPool + bool valid() const override; + void newStream(GenericConnectionPoolCallbacks* callbacks) override; - void cancel() override { - upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess); - } + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) override; private: + Tcp::ConnectionPool::Instance* conn_pool_{}; Tcp::ConnectionPool::Cancellable* upstream_handle_{}; + GenericConnectionPoolCallbacks* callbacks_{}; + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; }; -class HttpConnectionHandle : public ConnectionHandle { +class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks { public: - HttpConnectionHandle(Http::ConnectionPool::Cancellable* handle) : upstream_http_handle_(handle) {} - void cancel() override { - upstream_http_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); - } + HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, + Upstream::LoadBalancerContext* context, std::string hostname, + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks); + ~HttpConnPool() override; + + // GenericConnPool + bool valid() const override; + void newStream(GenericConnectionPoolCallbacks* callbacks) override; + + // Http::ConnectionPool::Callbacks, + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) override; private: - Http::ConnectionPool::Cancellable* upstream_http_handle_{}; + const std::string hostname_; + Http::ConnectionPool::Instance* conn_pool_{}; + Http::ConnectionPool::Cancellable* upstream_handle_{}; + GenericConnectionPoolCallbacks* callbacks_{}; + Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; + std::unique_ptr upstream_; +}; + +// An API for the UpstreamRequest to get callbacks from either an HTTP or TCP +// connection pool. +class GenericConnectionPoolCallbacks { +public: + virtual ~GenericConnectionPoolCallbacks() = default; + + virtual void onGenericPoolReady(StreamInfo::StreamInfo* info, + std::unique_ptr&& upstream, + Upstream::HostDescriptionConstSharedPtr& host, + const Network::Address::InstanceConstSharedPtr& local_address, + Ssl::ConnectionInfoConstSharedPtr ssl_info) PURE; + virtual void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, + Upstream::HostDescriptionConstSharedPtr host) PURE; }; // Interface for a generic Upstream, which can communicate with a TCP or HTTP From 557ea5fc94126c11904753a9cf8598c51695ee06 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 6 Oct 2020 09:26:57 -0400 Subject: [PATCH 2/5] reviewer comments Signed-off-by: Alyssa Wilk --- source/common/tcp_proxy/tcp_proxy.h | 5 +++++ source/common/tcp_proxy/upstream.cc | 2 ++ 2 files changed, 7 insertions(+) diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 7f690450514fa..12b482e01690f 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -366,7 +366,12 @@ class Filter : public Network::ReadFilter, std::shared_ptr upstream_callbacks_; // shared_ptr required for passing as a // read filter. + // The upstream handle (either TCP or HTTP). This is set in onGenericPoolReady and should persist + // until either the upstream or downstream connection is terminated. std::unique_ptr upstream_; + // The connection pool used to set up |upstream_|. + // This will be non-null from when an upstream connection is attempted until + // it either succeeds or fails. std::unique_ptr generic_conn_pool_; RouteConstSharedPtr route_; Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 76a746819b9bd..84f98e2c8a1ed 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -188,6 +188,7 @@ void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, upstream_handle_ = nullptr; callbacks_->onGenericPoolFailure(reason, host); } + void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Upstream::HostDescriptionConstSharedPtr host) { upstream_handle_ = nullptr; @@ -233,6 +234,7 @@ void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl: upstream_handle_ = nullptr; callbacks_->onGenericPoolFailure(reason, host); } + void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info) { From 9b0e88fdfc52666a70cbb46acaf0399237dbc280 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 6 Oct 2020 10:30:50 -0400 Subject: [PATCH 3/5] comment cleanup Signed-off-by: Alyssa Wilk --- source/common/tcp_proxy/upstream.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index 950065d88018e..b8984c020f90b 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -21,8 +21,6 @@ class GenericConnPool : public Logger::Loggable { // is then responsible for calling either onPoolReady or onPoolFailure on the // supplied GenericConnectionPoolCallbacks. virtual void newStream(GenericConnectionPoolCallbacks* callbacks) PURE; - //// Returns the host for this conn pool. - // virtual Upstream::HostDescriptionConstSharedPtr host() const PURE; virtual bool valid() const PURE; }; From 85c47c0fe86b95c57671ca2bc9e11d58638e54a7 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 12 Oct 2020 09:50:22 -0400 Subject: [PATCH 4/5] reviewer comments Signed-off-by: Alyssa Wilk --- source/common/tcp_proxy/upstream.cc | 12 +++++------- source/common/tcp_proxy/upstream.h | 5 ++++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 84f98e2c8a1ed..3629ca9114300 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -164,7 +164,7 @@ TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManag TcpConnPool::~TcpConnPool() { if (upstream_handle_ != nullptr) { - upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess); + upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess); } } @@ -212,7 +212,7 @@ HttpConnPool::HttpConnPool(const std::string& cluster_name, HttpConnPool::~HttpConnPool() { if (upstream_handle_ != nullptr) { - upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default); } } @@ -221,9 +221,8 @@ bool HttpConnPool::valid() const { return conn_pool_ != nullptr; } void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { callbacks_ = callbacks; upstream_ = std::make_unique(upstream_callbacks_, hostname_); - HttpUpstream* http_upstream = static_cast(upstream_.get()); Tcp::ConnectionPool::Cancellable* handle = - conn_pool_->newStream(http_upstream->responseDecoder(), *this); + conn_pool_->newStream(upstream_->responseDecoder(), *this); if (handle != nullptr) { upstream_handle_ = handle; } @@ -240,9 +239,8 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, const StreamInfo::StreamInfo& info) { upstream_handle_ = nullptr; Http::RequestEncoder* latched_encoder = &request_encoder; - HttpUpstream* http_upstream = static_cast(upstream_.get()); - http_upstream->setRequestEncoder(request_encoder, - host->transportSocketFactory().implementsSecureTransport()); + upstream_->setRequestEncoder(request_encoder, + host->transportSocketFactory().implementsSecureTransport()); callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, latched_encoder->getStream().connectionLocalAddress(), info.downstreamSslConnection()); diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index b8984c020f90b..33943e70b982e 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -21,6 +21,7 @@ class GenericConnPool : public Logger::Loggable { // is then responsible for calling either onPoolReady or onPoolFailure on the // supplied GenericConnectionPoolCallbacks. virtual void newStream(GenericConnectionPoolCallbacks* callbacks) PURE; + // Returns true if there was a valid connection pool, false otherwise. virtual bool valid() const PURE; }; @@ -48,6 +49,8 @@ class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callback Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; }; +class HttpUpstream; + class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks { public: HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, @@ -73,7 +76,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba Http::ConnectionPool::Cancellable* upstream_handle_{}; GenericConnectionPoolCallbacks* callbacks_{}; Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_; - std::unique_ptr upstream_; + std::unique_ptr upstream_; }; // An API for the UpstreamRequest to get callbacks from either an HTTP or TCP From 2d2024b785a467ceb53bfb65b4a973fe3d9b91a8 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 12 Oct 2020 15:30:55 -0400 Subject: [PATCH 5/5] comments Signed-off-by: Alyssa Wilk --- source/common/tcp_proxy/upstream.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 3629ca9114300..1da6eb9157979 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -212,6 +212,8 @@ HttpConnPool::HttpConnPool(const std::string& cluster_name, HttpConnPool::~HttpConnPool() { if (upstream_handle_ != nullptr) { + // Because HTTP connections are generally shorter lived and have a higher probability of use + // before going idle, they are closed with Default rather than CloseExcess. upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default); } }