Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ enum class ConnectionCloseType {
/**
* An abstract raw connection. Free the connection or call close() to disconnect.
*/
class Connection : public Event::DeferredDeletable {
class Connection : public Event::DeferredDeletable, public FilterManager {
public:
enum class State { Open, Closing, Closed };

Expand All @@ -71,24 +71,6 @@ class Connection : public Event::DeferredDeletable {
*/
virtual void addConnectionCallbacks(ConnectionCallbacks& cb) PURE;

/**
* Add a write filter to the connection. Filters are invoked in LIFO order (the last added
* filter is called first).
*/
virtual void addWriteFilter(WriteFilterPtr filter) PURE;

/**
* Add a combination filter to the connection. Equivalent to calling both addWriteFilter()
* and addReadFilter() with the same filter instance.
*/
virtual void addFilter(FilterPtr filter) PURE;

/**
* Add a read filter to the connection. Filters are invoked in FIFO order (the filter added
* first is called first).
*/
virtual void addReadFilter(ReadFilterPtr filter) PURE;

/**
* Close the connection.
*/
Expand Down
26 changes: 26 additions & 0 deletions include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ typedef std::shared_ptr<ReadFilter> ReadFilterPtr;
class Filter : public WriteFilter, public ReadFilter {};
typedef std::shared_ptr<Filter> FilterPtr;

/**
* Interface for adding individual network filters to a manager.
*/
class FilterManager {
public:
virtual ~FilterManager() {}

/**
* Add a write filter to the connection. Filters are invoked in LIFO order (the last added
* filter is called first).
*/
virtual void addWriteFilter(WriteFilterPtr filter) PURE;

/**
* Add a combination filter to the connection. Equivalent to calling both addWriteFilter()
* and addReadFilter() with the same filter instance.
*/
virtual void addFilter(FilterPtr filter) PURE;

/**
* Add a read filter to the connection. Filters are invoked in FIFO order (the filter added
* first is called first).
*/
virtual void addReadFilter(ReadFilterPtr filter) PURE;
};

/**
* Creates a chain of network filters for a new connection.
*/
Expand Down
2 changes: 1 addition & 1 deletion source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ add_library(
mongo/utility.cc
network/connection_impl.cc
network/dns_impl.cc
network/filter_manager.cc
network/filter_manager_impl.cc
network/listener_impl.cc
network/listen_socket_impl.cc
network/proxy_protocol.cc
Expand Down
11 changes: 0 additions & 11 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,6 @@ void TcpProxy::onConnectTimeout() {
}

Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) {
if (!upstream_connection_) {
// TODO: This is done here vs. the constructor because currently the filter manager is not built
// to handle the downstream connection being closed during construction. The better long
// term solution is to have an initialize() method that is passed the relevant data and
// can also cause the filter manager to stop execution.
initializeUpstreamConnection();
if (!upstream_connection_) {
return Network::FilterStatus::StopIteration;
}
}

conn_log_trace("received {} bytes", read_callbacks_->connection(), data.length());
upstream_connection_->write(data);
ASSERT(0 == data.length());
Expand Down
1 change: 1 addition & 0 deletions source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter
read_callbacks_ = &callbacks;
conn_log_info("new tcp proxy session", read_callbacks_->connection());
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
initializeUpstreamConnection();
}

private:
Expand Down
10 changes: 6 additions & 4 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "filter_manager.h"
#include "filter_manager_impl.h"

#include "envoy/network/connection.h"

Expand All @@ -27,11 +27,13 @@ class ConnectionImpl : public virtual Connection,
const std::string& remote_address);
~ConnectionImpl();

// Network::Connection
void addConnectionCallbacks(ConnectionCallbacks& cb) override;
// Network::FilterManager
void addWriteFilter(WriteFilterPtr filter) override;
void addFilter(FilterPtr filter) override;
void addReadFilter(ReadFilterPtr filter) override;

// Network::Connection
void addConnectionCallbacks(ConnectionCallbacks& cb) override;
void close(ConnectionCloseType type) override;
Event::Dispatcher& dispatcher() override;
uint64_t id() override;
Expand Down Expand Up @@ -98,7 +100,7 @@ class ConnectionImpl : public virtual Connection,
Event::Libevent::BufferEventPtr bev_;
const std::string remote_address_;
const uint64_t id_;
FilterManager filter_manager_;
FilterManagerImpl filter_manager_;
std::list<ConnectionCallbacks*> callbacks_;
Event::TimerPtr redispatch_read_event_;
bool read_enabled_;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
#include "filter_manager.h"
#include "filter_manager_impl.h"

#include "envoy/network/connection.h"

#include "common/common/assert.h"

namespace Network {

void FilterManager::addWriteFilter(WriteFilterPtr filter) {
void FilterManagerImpl::addWriteFilter(WriteFilterPtr filter) {
ASSERT(connection_.state() == Connection::State::Open);
downstream_filters_.emplace_back(filter);
}

void FilterManager::addFilter(FilterPtr filter) {
void FilterManagerImpl::addFilter(FilterPtr filter) {
addReadFilter(filter);
addWriteFilter(filter);
}

void FilterManager::addReadFilter(ReadFilterPtr filter) {
void FilterManagerImpl::addReadFilter(ReadFilterPtr filter) {
ASSERT(connection_.state() == Connection::State::Open);
ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
filter->initializeReadFilterCallbacks(*new_filter);
new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_);
}

void FilterManager::destroyFilters() {
void FilterManagerImpl::destroyFilters() {
upstream_filters_.clear();
downstream_filters_.clear();
}

void FilterManager::onContinueReading(ActiveReadFilter* filter) {
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
entry = upstream_filters_.begin();
Expand All @@ -40,12 +44,12 @@ void FilterManager::onContinueReading(ActiveReadFilter* filter) {
}
}

void FilterManager::onRead() {
void FilterManagerImpl::onRead() {
ASSERT(!upstream_filters_.empty());
onContinueReading(nullptr);
}

FilterStatus FilterManager::onWrite() {
FilterStatus FilterManagerImpl::onWrite() {
for (const WriteFilterPtr& filter : downstream_filters_) {
FilterStatus status = filter->onWrite(buffer_source_.getWriteBuffer());
if (status == FilterStatus::StopIteration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class BufferSource {
/**
* This is a filter manager for TCP (L4) filters. It is split out for ease of testing.
*/
class FilterManager {
class FilterManagerImpl {
public:
FilterManager(Connection& connection, BufferSource& buffer_source)
FilterManagerImpl(Connection& connection, BufferSource& buffer_source)
: connection_(connection), buffer_source_(buffer_source) {}

void addWriteFilter(WriteFilterPtr filter);
Expand All @@ -41,7 +41,7 @@ class FilterManager {

private:
struct ActiveReadFilter : public ReadFilterCallbacks, LinkedObject<ActiveReadFilter> {
ActiveReadFilter(FilterManager& parent, ReadFilterPtr filter)
ActiveReadFilter(FilterManagerImpl& parent, ReadFilterPtr filter)
: parent_(parent), filter_(filter) {}

Connection& connection() override { return parent_.connection_; }
Expand All @@ -51,7 +51,7 @@ class FilterManager {
parent_.host_description_ = host;
}

FilterManager& parent_;
FilterManagerImpl& parent_;
ReadFilterPtr filter_;
};

Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/client_ssl_auth.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class ClientSslAuthConfigFactory : public NetworkFilterConfigFactory {
Filter::Auth::ClientSsl::ConfigPtr config(new Filter::Auth::ClientSsl::Config(
json_config, server.threadLocal(), server.clusterManager(), server.dispatcher(),
server.stats(), server.runtime()));
return [config](Network::Connection& connection) -> void {
connection.addReadFilter(
return [config](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(
Network::ReadFilterPtr{new Filter::Auth::ClientSsl::Instance(config)});
};
}
Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/echo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class EchoConfigFactory : public NetworkFilterConfigFactory {
return nullptr;
}

return [](Network::Connection& connection)
-> void { connection.addReadFilter(Network::ReadFilterPtr{new Filter::Echo()}); };
return [](Network::FilterManager& filter_manager)
-> void { filter_manager.addReadFilter(Network::ReadFilterPtr{new Filter::Echo()}); };
}
};

Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/http_connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class HttpConnectionManagerFilterConfigFactory : Logger::Loggable<Logger::Id::co

std::shared_ptr<HttpConnectionManagerConfig> http_config(
new HttpConnectionManagerConfig(config, server));
return [http_config, &server](Network::Connection& connection) mutable -> void {
connection.addReadFilter(Network::ReadFilterPtr{
return [http_config, &server](Network::FilterManager& filter_manager) mutable -> void {
filter_manager.addReadFilter(Network::ReadFilterPtr{
new Http::ConnectionManagerImpl(*http_config, server.drainManager(), server.random(),
server.httpTracer(), server.runtime())});
};
Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/mongo_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class MongoProxyFilterConfigFactory : public NetworkFilterConfigFactory {
server.accessLogManager().registerAccessLog(access_log);
}

return [stat_prefix, &server, access_log](Network::Connection& connection) -> void {
connection.addFilter(Network::FilterPtr{
return [stat_prefix, &server, access_log](Network::FilterManager& filter_manager) -> void {
filter_manager.addFilter(Network::FilterPtr{
new Mongo::ProdProxyFilter(stat_prefix, server.stats(), server.runtime(), access_log)});
};
}
Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class RateLimitConfigFactory : public NetworkFilterConfigFactory {

RateLimit::TcpFilter::ConfigPtr config(
new RateLimit::TcpFilter::Config(json_config, server.stats(), server.runtime()));
return [config, &server](Network::Connection& connection) -> void {
connection.addReadFilter(Network::ReadFilterPtr{new RateLimit::TcpFilter::Instance(
return [config, &server](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(Network::ReadFilterPtr{new RateLimit::TcpFilter::Instance(
config, server.rateLimitClient(Optional<std::chrono::milliseconds>()))});
};
}
Expand Down
4 changes: 2 additions & 2 deletions source/server/config/network/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class TcpProxyConfigFactory : public NetworkFilterConfigFactory {

Filter::TcpProxyConfigPtr filter_config(
new Filter::TcpProxyConfig(config, server.clusterManager(), server.stats()));
return [filter_config, &server](Network::Connection& connection) -> void {
connection.addReadFilter(
return [filter_config, &server](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(
Network::ReadFilterPtr{new Filter::TcpProxy(filter_config, server.clusterManager())});
};
}
Expand Down
17 changes: 14 additions & 3 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
namespace Server {
namespace Configuration {

void FilterChainUtility::buildFilterChain(Network::Connection& connection,
const std::list<NetworkFilterFactoryCb>& factories) {
for (const NetworkFilterFactoryCb& factory : factories) {
// It's possible for a connection to be closed immediately in the middle of chain creation.
// If this happened, do not instantiate any more filters.
if (connection.state() != Network::Connection::State::Open) {
break;
}

factory(connection);
}
}

MainImpl::MainImpl(Server::Instance& server) : server_(server) {}

void MainImpl::initialize(const std::string& file_path) {
Expand Down Expand Up @@ -158,9 +171,7 @@ MainImpl::ListenerConfig::ListenerConfig(MainImpl& parent, Json::Object& json)
}

void MainImpl::ListenerConfig::createFilterChain(Network::Connection& connection) {
for (const NetworkFilterFactoryCb& factory : filter_factories_) {
factory(connection);
}
FilterChainUtility::buildFilterChain(connection, filter_factories_);
}

InitialImpl::InitialImpl(const std::string& file_path) {
Expand Down
15 changes: 14 additions & 1 deletion source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum class NetworkFilterType { Read, Write, Both };
* come in. Filter factories create the lambda at configuration initialization time, and then they
* are used at runtime. This maps JSON -> runtime configuration.
*/
typedef std::function<void(Network::Connection&)> NetworkFilterFactoryCb;
typedef std::function<void(Network::FilterManager& filter_manager)> NetworkFilterFactoryCb;

/**
* Implemented by each network filter and registered via registerNetworkFilterConfigFactory() or
Expand All @@ -34,6 +34,19 @@ class NetworkFilterConfigFactory {
Server::Instance& server) PURE;
};

/**
* Utilities for creating a filter chain for a network connection.
*/
class FilterChainUtility {
public:
/**
* Given a connection and a list of factories, create a new filter chain. Chain creation will
* exit early if any filters immediately close the connection.
*/
static void buildFilterChain(Network::Connection& connection,
const std::list<NetworkFilterFactoryCb>& factories);
};

/**
* Implementation of Server::Configuration::Main that reads a configuration from a JSON file.
*/
Expand Down
18 changes: 10 additions & 8 deletions source/server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
#include "envoy/event/timer.h"
#include "envoy/network/filter.h"

#include "common/api/api_impl.h"

ConnectionHandler::ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger,
std::chrono::milliseconds file_flush_interval_msec)
: stats_store_(stats_store), logger_(logger), api_(new Api::Impl(file_flush_interval_msec)),
Api::ApiPtr&& api)
: stats_store_(stats_store), logger_(logger), api_(std::move(api)),
dispatcher_(api_->allocateDispatcher()),
watchdog_miss_counter_(stats_store.counter("server.watchdog_miss")),
watchdog_mega_miss_counter_(stats_store.counter("server.watchdog_mega_miss")) {}
Expand Down Expand Up @@ -75,10 +73,14 @@ ConnectionHandler::SslActiveListener::SslActiveListener(ConnectionHandler& paren
void ConnectionHandler::ActiveListener::onNewConnection(Network::ConnectionPtr&& new_connection) {
conn_log(parent_.logger_, info, "new connection", *new_connection);
factory_.createFilterChain(*new_connection);
ActiveConnectionPtr active_connection(
new ActiveConnection(parent_, std::move(new_connection), stats_));
active_connection->moveIntoList(std::move(active_connection), parent_.connections_);
parent_.num_connections_++;

// If the connection is already closed, we can just let this connection immediately die.
if (new_connection->state() != Network::Connection::State::Closed) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also rule out State::Closing here? (do only for State::Open)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that was on purpose, to allow for write flushing if needed.

ActiveConnectionPtr active_connection(
new ActiveConnection(parent_, std::move(new_connection), stats_));
active_connection->moveIntoList(std::move(active_connection), parent_.connections_);
parent_.num_connections_++;
}
}

ConnectionHandler::ActiveConnection::ActiveConnection(ConnectionHandler& parent,
Expand Down
3 changes: 1 addition & 2 deletions source/server/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ struct ListenerStats {
*/
class ConnectionHandler final : NonCopyable {
public:
ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger,
std::chrono::milliseconds file_flush_interval_msec);
ConnectionHandler(Stats::Store& stats_store, spdlog::logger& logger, Api::ApiPtr&& api);
~ConnectionHandler();

Api::Api& api() { return *api_; }
Expand Down
Loading