From 80ceaeca46bd726435bc891d010cc4f0eac8c411 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Wed, 26 Oct 2016 11:18:12 -0700 Subject: [PATCH 1/2] tcp proxy: connect to upstream when downstream connections The previous behavior was to connect only when data is received on the downstream connection. Unfortunately this breaks protocols which connect and the server is the first half to send data (e.g., mysql). This required a refactor to handle the case where we immediately disconnect downstream during network filter chain creation. This commit also cleans up the filter factory functions so that they take a filter manager interface vs. a connection interface to force filters to operate via the filter callbacks. --- include/envoy/network/connection.h | 20 +-------- include/envoy/network/filter.h | 26 +++++++++++ source/common/CMakeLists.txt | 2 +- source/common/filter/tcp_proxy.cc | 11 ----- source/common/filter/tcp_proxy.h | 1 + source/common/network/connection_impl.h | 10 +++-- ...lter_manager.cc => filter_manager_impl.cc} | 20 +++++---- ...filter_manager.h => filter_manager_impl.h} | 8 ++-- .../server/config/network/client_ssl_auth.cc | 4 +- source/server/config/network/echo.cc | 4 +- .../config/network/http_connection_manager.cc | 4 +- source/server/config/network/mongo_proxy.cc | 4 +- source/server/config/network/ratelimit.cc | 4 +- source/server/config/network/tcp_proxy.cc | 4 +- source/server/configuration_impl.cc | 17 ++++++-- source/server/configuration_impl.h | 15 ++++++- source/server/connection_handler.cc | 18 ++++---- source/server/connection_handler.h | 3 +- source/server/server.cc | 4 +- source/server/worker.cc | 4 +- test/CMakeLists.txt | 4 +- test/common/filter/tcp_proxy_test.cc | 14 +++--- ...er_test.cc => filter_manager_impl_test.cc} | 6 +-- test/integration/fake_upstream.cc | 4 +- test/mocks/network/mocks.cc | 9 ++++ test/mocks/network/mocks.h | 24 +++++++++++ test/server/configuration_impl_test.cc | 29 +++++++++++++ test/server/connection_handler_test.cc | 43 +++++++++++++++++++ 28 files changed, 226 insertions(+), 90 deletions(-) rename source/common/network/{filter_manager.cc => filter_manager_impl.cc} (66%) rename source/common/network/{filter_manager.h => filter_manager_impl.h} (89%) rename test/common/network/{filter_manager_test.cc => filter_manager_impl_test.cc} (94%) create mode 100644 test/server/configuration_impl_test.cc create mode 100644 test/server/connection_handler_test.cc diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index a81fe52cb8d37..3c3aa3dc736dd 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -60,7 +60,7 @@ enum class ConnectionCloseType { /** * An abstract raw connection. Free the connection or call close() to disconnect. */ -class Connection : public Event::DeferredDeletable { +class Connection : public Event::DeferredDeletable, public FilterManager { public: enum class State { Open, Closing, Closed }; @@ -71,24 +71,6 @@ class Connection : public Event::DeferredDeletable { */ virtual void addConnectionCallbacks(ConnectionCallbacks& cb) PURE; - /** - * Add a write filter to the connection. Filters are invoked in LIFO order (the last added - * filter is called first). - */ - virtual void addWriteFilter(WriteFilterPtr filter) PURE; - - /** - * Add a combination filter to the connection. Equivalent to calling both addWriteFilter() - * and addReadFilter() with the same filter instance. - */ - virtual void addFilter(FilterPtr filter) PURE; - - /** - * Add a read filter to the connection. Filters are invoked in FIFO order (the filter added - * first is called first). - */ - virtual void addReadFilter(ReadFilterPtr filter) PURE; - /** * Close the connection. */ diff --git a/include/envoy/network/filter.h b/include/envoy/network/filter.h index 4e50c03c89fdc..e34880cc05dee 100644 --- a/include/envoy/network/filter.h +++ b/include/envoy/network/filter.h @@ -99,6 +99,32 @@ typedef std::shared_ptr ReadFilterPtr; class Filter : public WriteFilter, public ReadFilter {}; typedef std::shared_ptr FilterPtr; +/** + * Interface for adding individual network filters to a manager. + */ +class FilterManager { +public: + virtual ~FilterManager() {} + + /** + * Add a write filter to the connection. Filters are invoked in LIFO order (the last added + * filter is called first). + */ + virtual void addWriteFilter(WriteFilterPtr filter) PURE; + + /** + * Add a combination filter to the connection. Equivalent to calling both addWriteFilter() + * and addReadFilter() with the same filter instance. + */ + virtual void addFilter(FilterPtr filter) PURE; + + /** + * Add a read filter to the connection. Filters are invoked in FIFO order (the filter added + * first is called first). + */ + virtual void addReadFilter(ReadFilterPtr filter) PURE; +}; + /** * Creates a chain of network filters for a new connection. */ diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 3bc7775129601..3b0b2eb7fcfa2 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -74,7 +74,7 @@ add_library( mongo/utility.cc network/connection_impl.cc network/dns_impl.cc - network/filter_manager.cc + network/filter_manager_impl.cc network/listener_impl.cc network/listen_socket_impl.cc network/proxy_protocol.cc diff --git a/source/common/filter/tcp_proxy.cc b/source/common/filter/tcp_proxy.cc index e768082734779..95c9f26340d20 100644 --- a/source/common/filter/tcp_proxy.cc +++ b/source/common/filter/tcp_proxy.cc @@ -94,17 +94,6 @@ void TcpProxy::onConnectTimeout() { } Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) { - if (!upstream_connection_) { - // TODO: This is done here vs. the constructor because currently the filter manager is not built - // to handle the downstream connection being closed during construction. The better long - // term solution is to have an initialize() method that is passed the relevant data and - // can also cause the filter manager to stop execution. - initializeUpstreamConnection(); - if (!upstream_connection_) { - return Network::FilterStatus::StopIteration; - } - } - conn_log_trace("received {} bytes", read_callbacks_->connection(), data.length()); upstream_connection_->write(data); ASSERT(0 == data.length()); diff --git a/source/common/filter/tcp_proxy.h b/source/common/filter/tcp_proxy.h index c516d46a17d52..10c16bf3a984b 100644 --- a/source/common/filter/tcp_proxy.h +++ b/source/common/filter/tcp_proxy.h @@ -64,6 +64,7 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggableconnection()); read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); + initializeUpstreamConnection(); } private: diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 16cdc0145d391..88994597b305d 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -1,6 +1,6 @@ #pragma once -#include "filter_manager.h" +#include "filter_manager_impl.h" #include "envoy/network/connection.h" @@ -27,11 +27,13 @@ class ConnectionImpl : public virtual Connection, const std::string& remote_address); ~ConnectionImpl(); - // Network::Connection - void addConnectionCallbacks(ConnectionCallbacks& cb) override; + // Network::FilterManager void addWriteFilter(WriteFilterPtr filter) override; void addFilter(FilterPtr filter) override; void addReadFilter(ReadFilterPtr filter) override; + + // Network::Connection + void addConnectionCallbacks(ConnectionCallbacks& cb) override; void close(ConnectionCloseType type) override; Event::Dispatcher& dispatcher() override; uint64_t id() override; @@ -98,7 +100,7 @@ class ConnectionImpl : public virtual Connection, Event::Libevent::BufferEventPtr bev_; const std::string remote_address_; const uint64_t id_; - FilterManager filter_manager_; + FilterManagerImpl filter_manager_; std::list callbacks_; Event::TimerPtr redispatch_read_event_; bool read_enabled_; diff --git a/source/common/network/filter_manager.cc b/source/common/network/filter_manager_impl.cc similarity index 66% rename from source/common/network/filter_manager.cc rename to source/common/network/filter_manager_impl.cc index 6fcd070bd37d3..15ae1fa797265 100644 --- a/source/common/network/filter_manager.cc +++ b/source/common/network/filter_manager_impl.cc @@ -1,30 +1,34 @@ -#include "filter_manager.h" +#include "filter_manager_impl.h" + +#include "envoy/network/connection.h" #include "common/common/assert.h" namespace Network { -void FilterManager::addWriteFilter(WriteFilterPtr filter) { +void FilterManagerImpl::addWriteFilter(WriteFilterPtr filter) { + ASSERT(connection_.state() != Connection::State::Closed); downstream_filters_.emplace_back(filter); } -void FilterManager::addFilter(FilterPtr filter) { +void FilterManagerImpl::addFilter(FilterPtr filter) { addReadFilter(filter); addWriteFilter(filter); } -void FilterManager::addReadFilter(ReadFilterPtr filter) { +void FilterManagerImpl::addReadFilter(ReadFilterPtr filter) { + ASSERT(connection_.state() != Connection::State::Closed); ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter}); filter->initializeReadFilterCallbacks(*new_filter); new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_); } -void FilterManager::destroyFilters() { +void FilterManagerImpl::destroyFilters() { upstream_filters_.clear(); downstream_filters_.clear(); } -void FilterManager::onContinueReading(ActiveReadFilter* filter) { +void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) { std::list::iterator entry; if (!filter) { entry = upstream_filters_.begin(); @@ -40,12 +44,12 @@ void FilterManager::onContinueReading(ActiveReadFilter* filter) { } } -void FilterManager::onRead() { +void FilterManagerImpl::onRead() { ASSERT(!upstream_filters_.empty()); onContinueReading(nullptr); } -FilterStatus FilterManager::onWrite() { +FilterStatus FilterManagerImpl::onWrite() { for (const WriteFilterPtr& filter : downstream_filters_) { FilterStatus status = filter->onWrite(buffer_source_.getWriteBuffer()); if (status == FilterStatus::StopIteration) { diff --git a/source/common/network/filter_manager.h b/source/common/network/filter_manager_impl.h similarity index 89% rename from source/common/network/filter_manager.h rename to source/common/network/filter_manager_impl.h index 295f10d036b74..2731f07d5bc1f 100644 --- a/source/common/network/filter_manager.h +++ b/source/common/network/filter_manager_impl.h @@ -27,9 +27,9 @@ class BufferSource { /** * This is a filter manager for TCP (L4) filters. It is split out for ease of testing. */ -class FilterManager { +class FilterManagerImpl { public: - FilterManager(Connection& connection, BufferSource& buffer_source) + FilterManagerImpl(Connection& connection, BufferSource& buffer_source) : connection_(connection), buffer_source_(buffer_source) {} void addWriteFilter(WriteFilterPtr filter); @@ -41,7 +41,7 @@ class FilterManager { private: struct ActiveReadFilter : public ReadFilterCallbacks, LinkedObject { - ActiveReadFilter(FilterManager& parent, ReadFilterPtr filter) + ActiveReadFilter(FilterManagerImpl& parent, ReadFilterPtr filter) : parent_(parent), filter_(filter) {} Connection& connection() override { return parent_.connection_; } @@ -51,7 +51,7 @@ class FilterManager { parent_.host_description_ = host; } - FilterManager& parent_; + FilterManagerImpl& parent_; ReadFilterPtr filter_; }; diff --git a/source/server/config/network/client_ssl_auth.cc b/source/server/config/network/client_ssl_auth.cc index 9a2957c3a0a39..a62a435777297 100644 --- a/source/server/config/network/client_ssl_auth.cc +++ b/source/server/config/network/client_ssl_auth.cc @@ -23,8 +23,8 @@ class ClientSslAuthConfigFactory : public NetworkFilterConfigFactory { Filter::Auth::ClientSsl::ConfigPtr config(new Filter::Auth::ClientSsl::Config( json_config, server.threadLocal(), server.clusterManager(), server.dispatcher(), server.stats(), server.runtime())); - return [config](Network::Connection& connection) -> void { - connection.addReadFilter( + return [config](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter( Network::ReadFilterPtr{new Filter::Auth::ClientSsl::Instance(config)}); }; } diff --git a/source/server/config/network/echo.cc b/source/server/config/network/echo.cc index 1feb909e890d1..6d8a1d57fecc3 100644 --- a/source/server/config/network/echo.cc +++ b/source/server/config/network/echo.cc @@ -18,8 +18,8 @@ class EchoConfigFactory : public NetworkFilterConfigFactory { return nullptr; } - return [](Network::Connection& connection) - -> void { connection.addReadFilter(Network::ReadFilterPtr{new Filter::Echo()}); }; + return [](Network::FilterManager& filter_manager) + -> void { filter_manager.addReadFilter(Network::ReadFilterPtr{new Filter::Echo()}); }; } }; diff --git a/source/server/config/network/http_connection_manager.cc b/source/server/config/network/http_connection_manager.cc index feb6ec9547f8e..7eeae34a00351 100644 --- a/source/server/config/network/http_connection_manager.cc +++ b/source/server/config/network/http_connection_manager.cc @@ -35,8 +35,8 @@ class HttpConnectionManagerFilterConfigFactory : Logger::Loggable http_config( new HttpConnectionManagerConfig(config, server)); - return [http_config, &server](Network::Connection& connection) mutable -> void { - connection.addReadFilter(Network::ReadFilterPtr{ + return [http_config, &server](Network::FilterManager& filter_manager) mutable -> void { + filter_manager.addReadFilter(Network::ReadFilterPtr{ new Http::ConnectionManagerImpl(*http_config, server.drainManager(), server.random(), server.httpTracer(), server.runtime())}); }; diff --git a/source/server/config/network/mongo_proxy.cc b/source/server/config/network/mongo_proxy.cc index c4e310385989d..dd4f983111125 100644 --- a/source/server/config/network/mongo_proxy.cc +++ b/source/server/config/network/mongo_proxy.cc @@ -30,8 +30,8 @@ class MongoProxyFilterConfigFactory : public NetworkFilterConfigFactory { server.accessLogManager().registerAccessLog(access_log); } - return [stat_prefix, &server, access_log](Network::Connection& connection) -> void { - connection.addFilter(Network::FilterPtr{ + return [stat_prefix, &server, access_log](Network::FilterManager& filter_manager) -> void { + filter_manager.addFilter(Network::FilterPtr{ new Mongo::ProdProxyFilter(stat_prefix, server.stats(), server.runtime(), access_log)}); }; } diff --git a/source/server/config/network/ratelimit.cc b/source/server/config/network/ratelimit.cc index e06b77446921d..10070700ef053 100644 --- a/source/server/config/network/ratelimit.cc +++ b/source/server/config/network/ratelimit.cc @@ -21,8 +21,8 @@ class RateLimitConfigFactory : public NetworkFilterConfigFactory { RateLimit::TcpFilter::ConfigPtr config( new RateLimit::TcpFilter::Config(json_config, server.stats(), server.runtime())); - return [config, &server](Network::Connection& connection) -> void { - connection.addReadFilter(Network::ReadFilterPtr{new RateLimit::TcpFilter::Instance( + return [config, &server](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter(Network::ReadFilterPtr{new RateLimit::TcpFilter::Instance( config, server.rateLimitClient(Optional()))}); }; } diff --git a/source/server/config/network/tcp_proxy.cc b/source/server/config/network/tcp_proxy.cc index 0d11c97d3f12d..0c09ac9b226df 100644 --- a/source/server/config/network/tcp_proxy.cc +++ b/source/server/config/network/tcp_proxy.cc @@ -22,8 +22,8 @@ class TcpProxyConfigFactory : public NetworkFilterConfigFactory { Filter::TcpProxyConfigPtr filter_config( new Filter::TcpProxyConfig(config, server.clusterManager(), server.stats())); - return [filter_config, &server](Network::Connection& connection) -> void { - connection.addReadFilter( + return [filter_config, &server](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter( Network::ReadFilterPtr{new Filter::TcpProxy(filter_config, server.clusterManager())}); }; } diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index d043d1cc14292..3cae4491e64b0 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -14,6 +14,19 @@ namespace Server { namespace Configuration { +void FilterChainUtility::buildFilterChain(Network::Connection& connection, + const std::list& factories) { + for (const NetworkFilterFactoryCb& factory : factories) { + // It's possible for a connection to be closed immediately in the middle of chain creation. + // If this happened, do not instantiate any more filters. + if (connection.state() != Network::Connection::State::Open) { + break; + } + + factory(connection); + } +} + MainImpl::MainImpl(Server::Instance& server) : server_(server) {} void MainImpl::initialize(const std::string& file_path) { @@ -158,9 +171,7 @@ MainImpl::ListenerConfig::ListenerConfig(MainImpl& parent, Json::Object& json) } void MainImpl::ListenerConfig::createFilterChain(Network::Connection& connection) { - for (const NetworkFilterFactoryCb& factory : filter_factories_) { - factory(connection); - } + FilterChainUtility::buildFilterChain(connection, filter_factories_); } InitialImpl::InitialImpl(const std::string& file_path) { diff --git a/source/server/configuration_impl.h b/source/server/configuration_impl.h index 12e6d14215b53..37158d80f14de 100644 --- a/source/server/configuration_impl.h +++ b/source/server/configuration_impl.h @@ -18,7 +18,7 @@ enum class NetworkFilterType { Read, Write, Both }; * come in. Filter factories create the lambda at configuration initialization time, and then they * are used at runtime. This maps JSON -> runtime configuration. */ -typedef std::function NetworkFilterFactoryCb; +typedef std::function NetworkFilterFactoryCb; /** * Implemented by each network filter and registered via registerNetworkFilterConfigFactory() or @@ -34,6 +34,19 @@ class NetworkFilterConfigFactory { Server::Instance& server) PURE; }; +/** + * Utilities for creating a filter chain for a network connection. + */ +class FilterChainUtility { +public: + /** + * Given a connection and a list of factories, create a new filter chain. Chain creation will + * exit early if any filters immediately close the connection. + */ + static void buildFilterChain(Network::Connection& connection, + const std::list& factories); +}; + /** * Implementation of Server::Configuration::Main that reads a configuration from a JSON file. */ diff --git a/source/server/connection_handler.cc b/source/server/connection_handler.cc index 73c8e7c491cea..a6f52a454de21 100644 --- a/source/server/connection_handler.cc +++ b/source/server/connection_handler.cc @@ -4,11 +4,9 @@ #include "envoy/event/timer.h" #include "envoy/network/filter.h" -#include "common/api/api_impl.h" - ConnectionHandler::ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger, - std::chrono::milliseconds file_flush_interval_msec) - : stats_store_(stats_store), logger_(logger), api_(new Api::Impl(file_flush_interval_msec)), + Api::ApiPtr&& api) + : stats_store_(stats_store), logger_(logger), api_(std::move(api)), dispatcher_(api_->allocateDispatcher()), watchdog_miss_counter_(stats_store.counter("server.watchdog_miss")), watchdog_mega_miss_counter_(stats_store.counter("server.watchdog_mega_miss")) {} @@ -75,10 +73,14 @@ ConnectionHandler::SslActiveListener::SslActiveListener(ConnectionHandler& paren void ConnectionHandler::ActiveListener::onNewConnection(Network::ConnectionPtr&& new_connection) { conn_log(parent_.logger_, info, "new connection", *new_connection); factory_.createFilterChain(*new_connection); - ActiveConnectionPtr active_connection( - new ActiveConnection(parent_, std::move(new_connection), stats_)); - active_connection->moveIntoList(std::move(active_connection), parent_.connections_); - parent_.num_connections_++; + + // If the connection is already closed, we can just let this connection immediately die. + if (new_connection->state() != Network::Connection::State::Closed) { + ActiveConnectionPtr active_connection( + new ActiveConnection(parent_, std::move(new_connection), stats_)); + active_connection->moveIntoList(std::move(active_connection), parent_.connections_); + parent_.num_connections_++; + } } ConnectionHandler::ActiveConnection::ActiveConnection(ConnectionHandler& parent, diff --git a/source/server/connection_handler.h b/source/server/connection_handler.h index d87b6d9d79fe6..20d0f62e94d8e 100644 --- a/source/server/connection_handler.h +++ b/source/server/connection_handler.h @@ -33,8 +33,7 @@ struct ListenerStats { */ class ConnectionHandler final : NonCopyable { public: - ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger, - std::chrono::milliseconds file_flush_interval_msec); + ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger, Api::ApiPtr&& api); ~ConnectionHandler(); Api::Api& api() { return *api_; } diff --git a/source/server/server.cc b/source/server/server.cc index bcda57fc9d353..ae56110cd9f40 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -3,7 +3,6 @@ #include "test_hooks.h" #include "worker.h" -#include "envoy/api/api.h" #include "envoy/event/dispatcher.h" #include "envoy/event/signal.h" #include "envoy/event/timer.h" @@ -12,6 +11,7 @@ #include "envoy/upstream/cluster_manager.h" #include "common/access_log/access_log_manager.h" +#include "common/api/api_impl.h" #include "common/common/version.h" #include "common/memory/stats.h" #include "common/network/utility.h" @@ -27,7 +27,7 @@ InstanceImpl::InstanceImpl(Options& options, TestHooks& hooks, HotRestart& resta : options_(options), restarter_(restarter), start_time_(time(nullptr)), original_start_time_(start_time_), stats_store_(store), access_log_lock_(access_log_lock), server_stats_{ALL_SERVER_STATS(POOL_GAUGE_PREFIX(stats_store_, "server."))}, - handler_(stats_store_, log(), options.fileFlushIntervalMsec()), + handler_(stats_store_, log(), Api::ApiPtr{new Api::Impl(options.fileFlushIntervalMsec())}), dns_resolver_(handler_.dispatcher().createDnsResolver()), local_address_(Network::Utility::getLocalAddress()) { diff --git a/source/server/worker.cc b/source/server/worker.cc index ffa7ae1f52d53..b514d78af2cf4 100644 --- a/source/server/worker.cc +++ b/source/server/worker.cc @@ -5,11 +5,13 @@ #include "envoy/server/configuration.h" #include "envoy/thread_local/thread_local.h" +#include "common/api/api_impl.h" #include "common/common/thread.h" Worker::Worker(Stats::Store& stats_store, ThreadLocal::Instance& tls, std::chrono::milliseconds file_flush_interval_msec) - : tls_(tls), handler_(new ConnectionHandler(stats_store, log(), file_flush_interval_msec)) { + : tls_(tls), handler_(new ConnectionHandler( + stats_store, log(), Api::ApiPtr{new Api::Impl(file_flush_interval_msec)})) { tls_.registerThread(handler_->dispatcher(), false); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e15cbd642d464..eab19a0298156 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -66,7 +66,7 @@ add_executable(envoy-test common/mongo/utility_test.cc common/network/connection_impl_test.cc common/network/dns_impl_test.cc - common/network/filter_manager_test.cc + common/network/filter_manager_impl_test.cc common/network/listen_socket_impl_test.cc common/network/proxy_protocol_test.cc common/network/utility_test.cc @@ -123,6 +123,8 @@ add_executable(envoy-test mocks/tracing/mocks.cc mocks/upstream/mocks.cc server/config/network/http_connection_manager_test.cc + server/configuration_impl_test.cc + server/connection_handler_test.cc server/drain_manager_impl_test.cc server/http/admin_test.cc server/http/health_check_test.cc diff --git a/test/common/filter/tcp_proxy_test.cc b/test/common/filter/tcp_proxy_test.cc index 01789890c2a2b..f257b4a2e3bd5 100644 --- a/test/common/filter/tcp_proxy_test.cc +++ b/test/common/filter/tcp_proxy_test.cc @@ -130,11 +130,8 @@ TEST_F(TcpProxyTest, UpstreamConnectTimeout) { } TEST_F(TcpProxyTest, NoHost) { - setup(false); - EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - Buffer::OwnedImpl buffer("hello"); - filter_->onData(buffer); + setup(false); } TEST_F(TcpProxyTest, DisconnectBeforeData) { @@ -160,16 +157,15 @@ TEST_F(TcpProxyTest, UpstreamConnectFailure) { } TEST_F(TcpProxyTest, UpstreamConnectionLimit) { - // setup sets up expectation for tcpConnForCluster but this test is expected to NOT call that - filter_.reset(new TcpProxy(config_, cluster_manager_)); - filter_->initializeReadFilterCallbacks(filter_callbacks_); cluster_manager_.cluster_.resource_manager_.reset( new Upstream::ResourceManagerImpl(runtime_, "fake_key", 0, 0, 0, 0)); - Buffer::OwnedImpl buffer("hello"); + // setup sets up expectation for tcpConnForCluster but this test is expected to NOT call that + filter_.reset(new TcpProxy(config_, cluster_manager_)); // The downstream connection closes if the proxy can't make an upstream connection. EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush)); - ASSERT_EQ(Network::FilterStatus::StopIteration, filter_->onData(buffer)); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + EXPECT_EQ( 1U, cluster_manager_.cluster_.stats_store_.counter("cluster.fake_cluster.upstream_cx_overflow") diff --git a/test/common/network/filter_manager_test.cc b/test/common/network/filter_manager_impl_test.cc similarity index 94% rename from test/common/network/filter_manager_test.cc rename to test/common/network/filter_manager_impl_test.cc index a0c46db25fb02..6493b36542a48 100644 --- a/test/common/network/filter_manager_test.cc +++ b/test/common/network/filter_manager_impl_test.cc @@ -1,5 +1,5 @@ #include "common/buffer/buffer_impl.h" -#include "common/network/filter_manager.h" +#include "common/network/filter_manager_impl.h" #include "test/mocks/buffer/mocks.h" #include "test/mocks/network/mocks.h" @@ -40,8 +40,8 @@ TEST_F(NetworkFilterManagerTest, All) { MockWriteFilter* write_filter(new MockWriteFilter()); MockFilter* filter(new LocalMockFilter(host_description)); - MockConnection connection; - FilterManager manager(connection, *this); + NiceMock connection; + FilterManagerImpl manager(connection, *this); manager.addReadFilter(ReadFilterPtr{read_filter}); manager.addWriteFilter(WriteFilterPtr{write_filter}); manager.addFilter(FilterPtr{filter}); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index ac9f97a22d7b3..8f25023897ea6 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -2,6 +2,7 @@ #include "envoy/event/dispatcher.h" +#include "common/api/api_impl.h" #include "common/buffer/buffer_impl.h" #include "common/http/header_map_impl.h" #include "common/http/http1/codec_impl.h" @@ -189,7 +190,8 @@ FakeUpstream::FakeUpstream(Ssl::ServerContext* ssl_ctx, uint32_t port, FakeUpstream::FakeUpstream(Ssl::ServerContext* ssl_ctx, Network::ListenSocketPtr&& listen_socket, FakeHttpConnection::Type type) : ssl_ctx_(ssl_ctx), socket_(std::move(listen_socket)), - handler_(stats_store_, log(), std::chrono::milliseconds(10000)), http_type_(type) { + handler_(stats_store_, log(), Api::ApiPtr{new Api::Impl(std::chrono::milliseconds(10000))}), + http_type_(type) { thread_.reset(new Thread::Thread([this]() -> void { threadRoutine(); })); server_initialized_.waitReady(); } diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 7a4bcb23abe57..3db324babb20e 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -101,4 +101,13 @@ MockListenerCallbacks::~MockListenerCallbacks() {} MockDrainDecision::MockDrainDecision() {} MockDrainDecision::~MockDrainDecision() {} +MockFilterChainFactory::MockFilterChainFactory() {} +MockFilterChainFactory::~MockFilterChainFactory() {} + +MockListenSocket::MockListenSocket() {} +MockListenSocket::~MockListenSocket() {} + +MockListener::MockListener() {} +MockListener::~MockListener() {} + } // Network diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 329d6fc692823..36341f9ab3a1c 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -163,4 +163,28 @@ class MockDrainDecision : public DrainDecision { MOCK_METHOD0(drainClose, bool()); }; +class MockFilterChainFactory : public FilterChainFactory { +public: + MockFilterChainFactory(); + ~MockFilterChainFactory(); + + MOCK_METHOD1(createFilterChain, void(Connection& connection)); +}; + +class MockListenSocket : public ListenSocket { +public: + MockListenSocket(); + ~MockListenSocket(); + + MOCK_METHOD0(name, const std::string()); + MOCK_METHOD0(fd, int()); + MOCK_METHOD0(close, void()); +}; + +class MockListener : public Listener { +public: + MockListener(); + ~MockListener(); +}; + } // Network diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc new file mode 100644 index 0000000000000..e024529e7c949 --- /dev/null +++ b/test/server/configuration_impl_test.cc @@ -0,0 +1,29 @@ +#include "server/configuration_impl.h" + +#include "test/mocks/common.h" +#include "test/mocks/network/mocks.h" + +using testing::InSequence; +using testing::Return; + +namespace Server { +namespace Configuration { + +TEST(FilterChainUtility, buildFilterChain) { + Network::MockConnection connection; + std::list factories; + ReadyWatcher watcher; + NetworkFilterFactoryCb factory = [&](Network::FilterManager&) -> void { watcher.ready(); }; + factories.push_back(factory); + factories.push_back(factory); + + // Make sure we short circuit if needed. + InSequence s; + EXPECT_CALL(connection, state()).WillOnce(Return(Network::Connection::State::Open)); + EXPECT_CALL(watcher, ready()); + EXPECT_CALL(connection, state()).WillOnce(Return(Network::Connection::State::Closing)); + FilterChainUtility::buildFilterChain(connection, factories); +} + +} // Configuration +} // Server diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc new file mode 100644 index 0000000000000..923fcd112e247 --- /dev/null +++ b/test/server/connection_handler_test.cc @@ -0,0 +1,43 @@ +#include "common/stats/stats_impl.h" +#include "server/connection_handler.h" + +#include "test/mocks/api/mocks.h" +#include "test/mocks/network/mocks.h" + +using testing::_; +using testing::InSequence; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; + +class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable {}; + +TEST_F(ConnectionHandlerTest, CloseDuringFilterChainCreate) { + InSequence s; + + Stats::IsolatedStoreImpl stats_store; + Api::MockApi* api = new Api::MockApi(); + Event::MockDispatcher* dispatcher = new NiceMock(); + EXPECT_CALL(*api, allocateDispatcher_()).WillOnce(Return(dispatcher)); + ConnectionHandler handler(stats_store, log(), Api::ApiPtr{api}); + Network::MockFilterChainFactory factory; + NiceMock socket; + + Network::Listener* listener = new Network::MockListener(); + Network::ListenerCallbacks* listener_callbacks; + EXPECT_CALL(*dispatcher, createListener_(_, _, _, _)) + .WillOnce(Invoke([&](Network::ListenSocket&, Network::ListenerCallbacks& cb, Stats::Store&, + bool) -> Network::Listener* { + listener_callbacks = &cb; + return listener; + + })); + handler.addListener(factory, socket, false); + + Network::MockConnection* connection = new NiceMock(); + EXPECT_CALL(factory, createFilterChain(_)); + EXPECT_CALL(*connection, state()).WillOnce(Return(Network::Connection::State::Closed)); + EXPECT_CALL(*connection, addConnectionCallbacks(_)).Times(0); + listener_callbacks->onNewConnection(Network::ConnectionPtr{connection}); + EXPECT_EQ(0UL, handler.numConnections()); +} From 541c743c3ffc7a59b98596234b41f43ab6c84b2c Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Wed, 26 Oct 2016 14:48:34 -0700 Subject: [PATCH 2/2] fix --- source/common/network/filter_manager_impl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/network/filter_manager_impl.cc b/source/common/network/filter_manager_impl.cc index 15ae1fa797265..acb5c002f2f77 100644 --- a/source/common/network/filter_manager_impl.cc +++ b/source/common/network/filter_manager_impl.cc @@ -7,7 +7,7 @@ namespace Network { void FilterManagerImpl::addWriteFilter(WriteFilterPtr filter) { - ASSERT(connection_.state() != Connection::State::Closed); + ASSERT(connection_.state() == Connection::State::Open); downstream_filters_.emplace_back(filter); } @@ -17,7 +17,7 @@ void FilterManagerImpl::addFilter(FilterPtr filter) { } void FilterManagerImpl::addReadFilter(ReadFilterPtr filter) { - ASSERT(connection_.state() != Connection::State::Closed); + ASSERT(connection_.state() == Connection::State::Open); ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter}); filter->initializeReadFilterCallbacks(*new_filter); new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_);