Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c3e5b6e
Enable to create ActiveTcpListener through the socket
soulxu Dec 30, 2021
40c2080
Create UDP listener through the socket
soulxu Dec 30, 2021
84094cf
Add `addresses` field to the API
soulxu May 9, 2022
cf23597
ListenerImpl: parse multiple addresses
soulxu May 9, 2022
0aa830b
ListenerImpl: add socket type as member
soulxu May 10, 2022
091bb27
ListenerImpl: hasCompatibleAddresses to support multiple addresses
soulxu May 11, 2022
37c5260
ListenerImpl: support duplicated address check for multiple addresses
soulxu May 11, 2022
8982d2a
ListenerImpl: support multiple listen socket factories
soulxu May 12, 2022
f08fd45
ListenerImpl: create and clones multiple addresses
soulxu May 13, 2022
6ca527f
ListenerManagerImpl: reuse addresses for drainning listener with mult…
soulxu May 14, 2022
c8d220a
ListenerManagerImpl: initialize socket factories
soulxu May 14, 2022
33d81ca
HotRestart: support multiple addresses
soulxu May 14, 2022
7fa7666
ConnectionHandlerImpl: enable create multiple active listener
soulxu May 14, 2022
415b6dd
ConnectionHandlerImpl: Add test case for multiple addreses
soulxu May 15, 2022
8a1bb30
Ensure connection balancer works with multi addresses
soulxu May 16, 2022
eb464d1
Support multiple addresses for UDP worker router
soulxu May 16, 2022
cab7dcc
Merge branch 'main' into multiple_addresses_for_listener
soulxu May 16, 2022
889b9fc
Add ifndef to dict
soulxu May 16, 2022
2b2f729
enable RemoveListenerDuringRebalance
soulxu May 16, 2022
b913699
Add integration test for multiple addresses
soulxu May 17, 2022
d3587dc
Fix the unitest for ConnectionHandlerImplTest
soulxu May 17, 2022
ccc2c73
Add more integration tests
soulxu May 17, 2022
9d6959e
Admin: return multiple addresses
soulxu May 17, 2022
2563b05
Add integration test for hot restart
soulxu May 17, 2022
96eb8bb
revert no need dict
soulxu May 17, 2022
59594e4
fix clang-tidy
soulxu May 17, 2022
aedc178
fix python format
soulxu May 17, 2022
65b8ed1
fix format again
soulxu May 17, 2022
6841036
fix format again
soulxu May 17, 2022
4f04597
Fix integration test register port
soulxu May 17, 2022
cee07ab
Add integration test for Admin
soulxu May 17, 2022
3dc7635
fix connection handler test
soulxu May 17, 2022
bbb7057
Pass right socket factory for udp
soulxu May 17, 2022
7de085f
fix format
soulxu May 17, 2022
ad3ecee
integration test: making the multiple addresses test works with multi…
soulxu May 18, 2022
0021a9b
quic: Add integration test for multiple addresses
soulxu May 18, 2022
365b584
ConnectionBalancer: back to use getConnectionHandlerByTag since it su…
soulxu May 18, 2022
6faa8f2
ConnectionHandlerImpl::getUdpListenerCallbacks cleanup
soulxu May 18, 2022
a1823c5
Simplify the quic integration test
soulxu May 18, 2022
c5409dd
Merge branch 'main' into multiple_addresses_for_listener
soulxu May 18, 2022
5b5577f
quic integration: running all the clients
soulxu May 18, 2022
0921df1
Fix clang-tidy
soulxu May 18, 2022
871eb36
Add comment
soulxu May 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/envoy/admin/v2alpha/listeners.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message ListenerStatus {
// Name of the listener
string name = 1;

// The actual local address that the listener is listening on. If a listener was configured
// The actual local addresses that the listener is listening on. If a listener was configured
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this v2 should not be modified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I modify the wrong file and then forget to revert back.

// to listen on port 0, then this address has the port that was allocated by the OS.
api.v2.core.Address local_address = 2;
}
4 changes: 2 additions & 2 deletions api/envoy/admin/v3/listeners.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ message ListenerStatus {
// Name of the listener
string name = 1;

// The actual local address that the listener is listening on. If a listener was configured
// The actual local addresses that the listener is listening on. If a listener was configured
// to listen on port 0, then this address has the port that was allocated by the OS.
config.core.v3.Address local_address = 2;
repeated config.core.v3.Address local_addresses = 2;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have backward-compatibility rule for the admin API?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a breaking change, and will not be approved.

}
9 changes: 8 additions & 1 deletion api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ message ListenerCollection {
repeated xds.core.v3.CollectionEntry entries = 1;
}

// [#next-free-field: 33]
// [#next-free-field: 34]
message Listener {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener";

Expand Down Expand Up @@ -107,8 +107,15 @@ message Listener {
// that is governed by the bind rules of the OS. E.g., multiple listeners can listen on port 0 on
// Linux as the actual port will be allocated by the OS.
// Required unless *api_listener* or *listener_specifier* is populated.
// TODO (soulxu): deprecated the `address` field after `addresses` implemented.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mark the old field as deprecated now since it will required to modify a lot of example config and test to not use the depcreated field, that will increase this PR size again and making the review harder. But let me know if we have better procedure for this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest splitting this PR, to a few PRs.
You can start by adding the addresses field, mark in not-implemented-hide, and gradually add additional parts of the implementation.
At the end, there should be a well-defined behavior of what happens when both fields (address and addresses) are provided.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! got it. I will mark it as not-implemented-hide, and separate the API change out.

For other parts, also want to hear others' opinions if they are happy to separate the PR, since before people may want to see the full picture. I'm ok with both ways, if we want to separate the whole thing, I will work on it.

@mattklein123 @ggreenway

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that literally every envoy configuration uses these fields, we'll want to support both the old and the new forever. And I'd adopt the semantic that it's a config load error if both of them are set. So it logically turns into a oneof between the two, but without modifying the old proto field.

As far as separating the PR or doing it all at once, I'm fine either way. I like seeing how the config will all be put together in one shot, but if the PR gets too big for that I'm fine splitting it. It may be worth defining the entire proto, but just not writing all the code yet, which should be fine if it's not-implemented-hide.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that literally every envoy configuration uses these fields, we'll want to support both the old and the new forever. And I'd adopt the semantic that it's a config load error if both of them are set. So it logically turns into a oneof between the two, but without modifying the old proto field.

I tend to agree with this, but from previous experience, when a configuration server supports both "old" and "new" Envoy clients, and sends them the same config, then the "new" clients will reject it.
One thing we can do is have a runtime guard against the rejection of configs that include both fields, and eventually remove that check.
If oneof is the desired goal, consider adding oneof_promotion tag.

As far as separating the PR or doing it all at once, I'm fine either way. I like seeing how the config will all be put together in one shot, but if the PR gets too big for that I'm fine splitting it. It may be worth defining the entire proto, but just not writing all the code yet, which should be fine if it's not-implemented-hide.

If possible, I think that it makes sense to break this to multiple smaller PRs (not only API vs non-API) to make it more reviewable. That said, if the PR includes mostly new tests, then it should be ok.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with this, but from previous experience, when a configuration server supports both "old" and "new" Envoy clients, and sends them the same config, then the "new" clients will reject it.

In this case, I think it could be as simple as if there is only one address, config server populates the old field. If there are multiple addresses, config server populates the new field. If config server doesn't know if the client supports the new field, it can do the old behavior of completely duplicate listeners (filter chains etc) except with a different address.

Copy link
Copy Markdown
Member Author

@soulxu soulxu May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with this, but from previous experience, when a configuration server supports both "old" and "new" Envoy clients, and sends them the same config, then the "new" clients will reject it. One thing we can do is have a runtime guard against the rejection of configs that include both fields, and eventually remove that check. If oneof is the desired goal, consider adding oneof_promotion tag.

I also have thing about keeping the old address field and adding a new one called additional_addresses. I haven't too much difference with one_of way. Only one thing is if the user doesn't specify the stat_prefix, then we fallback to use address field as stat_prefix. For the oneof case, if only the addresses specified, then choose the first one as the stat_prefix. The additional_addresses way has a little better sematic here.

As far as separating the PR or doing it all at once, I'm fine either way. I like seeing how the config will all be put together in one shot, but if the PR gets too big for that I'm fine splitting it. It may be worth defining the entire proto, but just not writing all the code yet, which should be fine if it's not-implemented-hide.

If possible, I think that it makes sense to break this to multiple smaller PRs (not only API vs non-API) to make it more reviewable. That said, if the PR includes mostly new tests, then it should be ok.

Let me see what the separate PR looks like. I found I can't change ListenerImpl and ListenerManagerImpl separately, since our unit-tests test them together. And we can live this PR here for people who want to see the whole picture.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see what the separate PR looks like. I found I can't change ListenerImpl and ListenerManagerImpl separately, since our unit-tests test them together. And we can live this PR here for people who want to see the whole picture.

Just to clarify, breaking to multiple PRs is not a must, and in some cases it may make sense to have everything in one PR.

Copy link
Copy Markdown
Member Author

@soulxu soulxu May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should be able to separate the PR as the commits have in the PR (maybe merge a few of them together) https://github.com/envoyproxy/envoy/pull/19367/commits, I checked each of major commit has unittest covered. I will merge those cleanup and format change commits in the bottom to the top commits. The last PR should be for the integration tests. Let me know if you guys happy with that.

core.v3.Address address = 2;

// A list of addresses that the listener should listen on. In general, the address must be unique, though
// that is governed by the bind rules of the OS. E.g., multiple listeners can listen on port 0 on
// Linux as the actual port will be allocated by the OS. For multiple addresses in single listener,
// all addresses are the same protocol, and multiple internal addresses don't support now.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// all addresses are the same protocol, and multiple internal addresses don't support now.
// all addresses use the same protocol, and multiple internal addresses are not yet supported.

repeated core.v3.Address addresses = 33;

// Optional prefix to use on listener stats. If empty, the stats will be rooted at
// `listener.<address as string>.`. If non-empty, stats will be rooted at
// `listener.<stat_prefix>.`.
Expand Down
17 changes: 12 additions & 5 deletions envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "source/common/common/interval_value.h"

#include "address.h"

namespace Envoy {
namespace Network {

Expand Down Expand Up @@ -178,10 +180,12 @@ class TcpConnectionHandler : public virtual ConnectionHandler {
/**
* Obtain the rebalancer of the tcp listener.
* @param listener_tag supplies the tag of the tcp listener that was passed to addListener().
* @param address is used to query the address specific handler.
* @return BalancedConnectionHandlerOptRef the balancer attached to the listener. `nullopt` if
* listener doesn't exist or rebalancer doesn't exist.
*/
virtual BalancedConnectionHandlerOptRef getBalancedHandlerByTag(uint64_t listener_tag) PURE;
virtual BalancedConnectionHandlerOptRef
getBalancedHandlerByTag(uint64_t listener_tag, const Network::Address::Instance& address) PURE;

/**
* Obtain the rebalancer of the tcp listener.
Expand All @@ -199,11 +203,12 @@ class TcpConnectionHandler : public virtual ConnectionHandler {
class UdpConnectionHandler : public virtual ConnectionHandler {
public:
/**
* Get the ``UdpListenerCallbacks`` associated with ``listener_tag``. This will be
* Get the ``UdpListenerCallbacks`` associated with ``listener_tag`` and ``address``. This will be
* absl::nullopt for non-UDP listeners and for ``listener_tag`` values that have already been
* removed.
*/
virtual UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) PURE;
virtual UdpListenerCallbacksOptRef
getUdpListenerCallbacks(uint64_t listener_tag, const Network::Address::Instance& address) PURE;
};

/**
Expand All @@ -219,15 +224,17 @@ class ActiveUdpListenerFactory {
* @param runtime the runtime for this server.
* @param worker_index The index of the worker this listener is being created on.
* @param parent is the owner of the created ActiveListener objects.
* @param listen_socket_ptr is the UDP socket.
* @param dispatcher is used to create actual UDP listener.
* @param config provides information needed to create ActiveUdpListener and
* UdpListener objects.
* @return the ActiveUdpListener created.
*/
virtual ConnectionHandler::ActiveUdpListenerPtr
createActiveUdpListener(Runtime::Loader& runtime, uint32_t worker_index,
UdpConnectionHandler& parent, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config) PURE;
UdpConnectionHandler& parent,
Network::SocketSharedPtr&& listen_socket_ptr,
Event::Dispatcher& dispatcher, Network::ListenerConfig& config) PURE;

/**
* @return true if the UDP passing through listener doesn't form stateful connections.
Expand Down
13 changes: 9 additions & 4 deletions envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "source/common/common/interval_value.h"

#include "address.h"

namespace Envoy {
namespace Network {

Expand Down Expand Up @@ -95,9 +97,11 @@ class UdpListenerConfig {
virtual UdpPacketWriterFactory& packetWriterFactory() PURE;

/**
* @param address is used to query the address specific router.
* @return the UdpListenerWorkerRouter for this listener.
*/
virtual UdpListenerWorkerRouter& listenerWorkerRouter() PURE;
virtual UdpListenerWorkerRouter&
listenerWorkerRouter(const Network::Address::Instance& address) PURE;

/**
* @return the configuration for the listener.
Expand Down Expand Up @@ -148,14 +152,14 @@ class ListenerConfig {
/**
* @return ListenSocketFactory& the factory to create listen socket.
*/
virtual ListenSocketFactory& listenSocketFactory() PURE;
virtual const std::vector<ListenSocketFactoryPtr>& listenSocketFactories() PURE;

/**
* @return bool specifies whether the listener should actually listen on the port.
* A listener that doesn't listen on a port can only receive connections
* redirected from other listeners.
*/
virtual bool bindToPort() PURE;
virtual bool bindToPort() const PURE;

/**
* @return bool if a connection should be handed off to another Listener after the original
Expand Down Expand Up @@ -215,10 +219,11 @@ class ListenerConfig {
virtual envoy::config::core::v3::TrafficDirection direction() const PURE;

/**
* @param address is used for query the address specific connection balancer.
* @return the connection balancer for this listener. All listeners have a connection balancer,
* though the implementation may be a NOP balancer.
*/
virtual ConnectionBalancer& connectionBalancer() PURE;
virtual ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) PURE;

/**
* Open connection resources for this listener.
Expand Down
9 changes: 9 additions & 0 deletions source/common/network/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,5 +418,14 @@ class Utility {
static absl::uint128 flipOrder(const absl::uint128& input);
};

/**
* Log formatter for a list of addresses.
*/
struct AddressStrFormatter {
void operator()(std::string* out, const Network::Address::InstanceConstSharedPtr& instance) {
out->append(instance->asString());
}
};

} // namespace Network
} // namespace Envoy
23 changes: 5 additions & 18 deletions source/common/quic/active_quic_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,7 @@ bool ActiveQuicListenerFactory::disable_kernel_bpf_packet_routing_for_test_ = fa
ActiveQuicListener::ActiveQuicListener(
Runtime::Loader& runtime, uint32_t worker_index, uint32_t concurrency,
Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
QuicStatNames& quic_stat_names, uint32_t packets_received_to_connection_count_ratio,
EnvoyQuicCryptoServerStreamFactoryInterface& crypto_server_stream_factory,
EnvoyQuicProofSourceFactoryInterface& proof_source_factory)
: ActiveQuicListener(runtime, worker_index, concurrency, dispatcher, parent,
listener_config.listenSocketFactory().getListenSocket(worker_index),
listener_config, quic_config, kernel_worker_routing, enabled,
quic_stat_names, packets_received_to_connection_count_ratio,
crypto_server_stream_factory, proof_source_factory) {}

ActiveQuicListener::ActiveQuicListener(
Runtime::Loader& runtime, uint32_t worker_index, uint32_t concurrency,
Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket, Network::ListenerConfig& listener_config,
Network::SocketSharedPtr&& listen_socket, Network::ListenerConfig& listener_config,
const quic::QuicConfig& quic_config, bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled, QuicStatNames& quic_stat_names,
uint32_t packets_to_read_to_connection_count_ratio,
Expand Down Expand Up @@ -338,11 +324,12 @@ ActiveQuicListenerFactory::ActiveQuicListenerFactory(

Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::createActiveUdpListener(
Runtime::Loader& runtime, uint32_t worker_index, Network::UdpConnectionHandler& parent,
Event::Dispatcher& disptacher, Network::ListenerConfig& config) {
Network::SocketSharedPtr&& listen_socket_ptr, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) {
ASSERT(crypto_server_stream_factory_.has_value());
return std::make_unique<ActiveQuicListener>(
runtime, worker_index, concurrency_, disptacher, parent, config, quic_config_,
kernel_worker_routing_, enabled_, quic_stat_names_,
runtime, worker_index, concurrency_, disptacher, parent, std::move(listen_socket_ptr), config,
quic_config_, kernel_worker_routing_, enabled_, quic_stat_names_,
packets_to_read_to_connection_count_ratio_, crypto_server_stream_factory_.value(),
proof_source_factory_.value());
}
Expand Down
17 changes: 4 additions & 13 deletions source/common/quic/active_quic_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@ class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase,

ActiveQuicListener(Runtime::Loader& runtime, uint32_t worker_index, uint32_t concurrency,
Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
QuicStatNames& quic_stat_names,
uint32_t packets_to_read_to_connection_count_ratio,
EnvoyQuicCryptoServerStreamFactoryInterface& crypto_server_stream_factory,
EnvoyQuicProofSourceFactoryInterface& proof_source_factory);

ActiveQuicListener(Runtime::Loader& runtime, uint32_t worker_index, uint32_t concurrency,
Event::Dispatcher& dispatcher, Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket,
Network::SocketSharedPtr&& listen_socket,
Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config,
bool kernel_worker_routing,
const envoy::config::core::v3::RuntimeFeatureFlag& enabled,
Expand Down Expand Up @@ -108,8 +98,9 @@ class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory,
// Network::ActiveUdpListenerFactory.
Network::ConnectionHandler::ActiveUdpListenerPtr
createActiveUdpListener(Runtime::Loader& runtime, uint32_t worker_index,
Network::UdpConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) override;
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr&& listen_socket_ptr,
Event::Dispatcher& disptacher, Network::ListenerConfig& config) override;
bool isTransportConnectionless() const override { return false; }
const Network::Socket::OptionsSharedPtr& socketOptions() const override { return options_; }

Expand Down
5 changes: 3 additions & 2 deletions source/server/active_raw_udp_listener_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ ActiveRawUdpListenerFactory::ActiveRawUdpListenerFactory(uint32_t concurrency)
Network::ConnectionHandler::ActiveUdpListenerPtr
ActiveRawUdpListenerFactory::createActiveUdpListener(Runtime::Loader&, uint32_t worker_index,
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr&& listen_socket_ptr,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config) {
return std::make_unique<ActiveRawUdpListener>(worker_index, concurrency_, parent, dispatcher,
config);
return std::make_unique<ActiveRawUdpListener>(worker_index, concurrency_, parent,
std::move(listen_socket_ptr), dispatcher, config);
}

} // namespace Server
Expand Down
5 changes: 3 additions & 2 deletions source/server/active_raw_udp_listener_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class ActiveRawUdpListenerFactory : public Network::ActiveUdpListenerFactory {

Network::ConnectionHandler::ActiveUdpListenerPtr
createActiveUdpListener(Runtime::Loader&, uint32_t worker_index,
Network::UdpConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) override;
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr&& listen_socket_ptr,
Event::Dispatcher& disptacher, Network::ListenerConfig& config) override;
bool isTransportConnectionless() const override { return true; }
const Network::Socket::OptionsSharedPtr& socketOptions() const override { return options_; }

Expand Down
38 changes: 21 additions & 17 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,32 @@ namespace Server {

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config, Runtime::Loader& runtime,
uint32_t worker_index)
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(),
parent.dispatcher().createListener(
config.listenSocketFactory().getListenSocket(worker_index),
*this, runtime, config.bindToPort(),
config.ignoreGlobalConnLimit()),
config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
Network::SocketSharedPtr&& socket,
Network::Address::InstanceConstSharedPtr& address,
Network::ConnectionBalancer& connection_balancer)
: OwnedActiveStreamListenerBase(
parent, parent.dispatcher(),
parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(),
config.ignoreGlobalConnLimit()),
config),
tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) {
connection_balancer_.registerHandler(*this);
}

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerPtr&& listener,
Network::ListenerConfig& config, Runtime::Loader&)
Network::Address::InstanceConstSharedPtr& address,
Network::ListenerConfig& config,
Network::ConnectionBalancer& connection_balancer,
Runtime::Loader&)
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
tcp_conn_handler_(parent), connection_balancer_(connection_balancer), address_(address) {
connection_balancer_.registerHandler(*this);
}

ActiveTcpListener::~ActiveTcpListener() {
is_deleting_ = true;
config_->connectionBalancer().unregisterHandler(*this);
connection_balancer_.unregisterHandler(*this);

// Purge sockets that have not progressed to connections. This should only happen when
// a listener filter stops iteration and never resumes.
Expand Down Expand Up @@ -65,7 +69,6 @@ ActiveTcpListener::~ActiveTcpListener() {

void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) {
ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag());
ASSERT(&config_->connectionBalancer() == &config.connectionBalancer());
config_ = &config;
}

Expand Down Expand Up @@ -98,7 +101,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
bool rebalanced) {
if (!rebalanced) {
Network::BalancedConnectionHandler& target_handler =
config_->connectionBalancer().pickTargetHandler(*this);
connection_balancer_.pickTargetHandler(*this);
if (&target_handler != this) {
target_handler.post(std::move(socket));
return;
Expand Down Expand Up @@ -153,10 +156,11 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) {
RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared<RebalancedSocket>();
socket_to_rebalance->socket = std::move(socket);

dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(),
dispatcher().post([socket_to_rebalance, address = address_, tag = config_->listenerTag(),
&tcp_conn_handler = tcp_conn_handler_,
handoff = config_->handOffRestoredDestinationConnections()]() {
auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag);
// `getBalancedHandlerByAddress` only support the IP address but there can be unix socket also.
auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag, *address);
if (balanced_handler.has_value()) {
balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true);
return;
Expand Down
11 changes: 9 additions & 2 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public Network::BalancedConnectionHandler {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config,
Runtime::Loader& runtime, uint32_t worker_index);
Runtime::Loader& runtime, Network::SocketSharedPtr&& socket,
Network::Address::InstanceConstSharedPtr& address,
Network::ConnectionBalancer& connection_balancer);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Network::ListenerConfig& config, Runtime::Loader& runtime);
Network::Address::InstanceConstSharedPtr& address,
Network::ListenerConfig& config,
Network::ConnectionBalancer& connection_balancer, Runtime::Loader& runtime);
~ActiveTcpListener() override;

bool listenerConnectionLimitReached() const {
Expand Down Expand Up @@ -81,6 +85,9 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks,
// The number of connections currently active on this listener. This is typically used for
// connection balancing across per-handler listeners.
std::atomic<uint64_t> num_listener_connections_{};

Network::ConnectionBalancer& connection_balancer_;
Network::Address::InstanceConstSharedPtr address_;
};

using ActiveTcpListenerOptRef = absl::optional<std::reference_wrapper<ActiveTcpListener>>;
Expand Down
Loading