From a020e6d4bff19f5f478697098d79012b3a8567db Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 20 Apr 2020 16:53:57 +0200 Subject: [PATCH 1/5] Log exception in ProcessImpl::run() Related to https://github.com/envoyproxy/nighthawk/issues/317 Log any exceptions and rethrow. This is to assist with diagnosis once it happens again in the CI env. Signed-off-by: Otto van der Schaaf --- source/client/process_impl.cc | 133 ++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 63 deletions(-) diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 32adb91af..3c2cb369f 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -414,75 +414,82 @@ bool ProcessImpl::run(OutputCollector& collector) { int number_of_workers = determineConcurrency(); shutdown_ = false; - const std::vector& workers = createWorkers(number_of_workers); - tls_.registerThread(*dispatcher_, true); - store_root_.initializeThreading(*dispatcher_, tls_); - runtime_singleton_ = std::make_unique( - Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( - *dispatcher_, tls_, {}, *local_info_, init_manager_, store_root_, generator_, - Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); - ssl_context_manager_ = - std::make_unique(time_system_); - cluster_manager_factory_ = std::make_unique( - admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, - dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_, - secret_manager_, validation_context_, *api_, http_context_, grpc_context_, - access_log_manager_, *singleton_manager_); - cluster_manager_factory_->setConnectionReuseStrategy( - options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU - ? Http1PoolImpl::ConnectionReuseStrategy::LRU - : Http1PoolImpl::ConnectionReuseStrategy::MRU); - cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); - if (options_.h2UseMultipleConnections()) { - cluster_manager_factory_->enableMultiConnectionH2Pool(); - } - envoy::config::bootstrap::v3::Bootstrap bootstrap; - createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); - if (tracing_uri != nullptr) { - setupTracingImplementation(bootstrap, *tracing_uri); - addTracingCluster(bootstrap, *tracing_uri); - } - ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); - cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); - maybeCreateTracingDriver(bootstrap.tracing()); - cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); + try { + const std::vector& workers = createWorkers(number_of_workers); + tls_.registerThread(*dispatcher_, true); + store_root_.initializeThreading(*dispatcher_, tls_); + runtime_singleton_ = std::make_unique( + Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( + *dispatcher_, tls_, {}, *local_info_, init_manager_, store_root_, generator_, + Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); + ssl_context_manager_ = + std::make_unique(time_system_); + cluster_manager_factory_ = std::make_unique( + admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, + dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, + *local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_, + access_log_manager_, *singleton_manager_); + cluster_manager_factory_->setConnectionReuseStrategy( + options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU + ? Http1PoolImpl::ConnectionReuseStrategy::LRU + : Http1PoolImpl::ConnectionReuseStrategy::MRU); + cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); + if (options_.h2UseMultipleConnections()) { + cluster_manager_factory_->enableMultiConnectionH2Pool(); + } + envoy::config::bootstrap::v3::Bootstrap bootstrap; + createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); + if (tracing_uri != nullptr) { + setupTracingImplementation(bootstrap, *tracing_uri); + addTracingCluster(bootstrap, *tracing_uri); + } + ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); + cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); + maybeCreateTracingDriver(bootstrap.tracing()); + cluster_manager_->setInitializedCb( + [this]() -> void { init_manager_.initialize(init_watcher_); }); - Runtime::LoaderSingleton::get().initialize(*cluster_manager_); + Runtime::LoaderSingleton::get().initialize(*cluster_manager_); - for (auto& w : workers_) { - w->start(); - } + for (auto& w : workers_) { + w->start(); + } - for (auto& w : workers_) { - w->waitForCompletion(); - } + for (auto& w : workers_) { + w->waitForCompletion(); + } - int i = 0; - std::chrono::nanoseconds total_execution_duration = 0ns; - for (auto& worker : workers_) { - auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); - // We don't write per-worker results if we only have a single worker, because the global results - // will be precisely the same. - if (workers_.size() > 1) { - StatisticFactoryImpl statistic_factory(options_); - collector.addResult(fmt::format("worker_{}", i), - vectorizeStatisticPtrMap(worker->statistics()), - worker->threadLocalCounterValues(), sequencer_execution_duration); + int i = 0; + std::chrono::nanoseconds total_execution_duration = 0ns; + for (auto& worker : workers_) { + auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); + // We don't write per-worker results if we only have a single worker, because the global + // results will be precisely the same. + if (workers_.size() > 1) { + StatisticFactoryImpl statistic_factory(options_); + collector.addResult(fmt::format("worker_{}", i), + vectorizeStatisticPtrMap(worker->statistics()), + worker->threadLocalCounterValues(), sequencer_execution_duration); + } + total_execution_duration += sequencer_execution_duration; + i++; } - total_execution_duration += sequencer_execution_duration; - i++; - } - // Note that above we use use counter values snapshotted by the workers right after its execution - // completes. Here we query the live counters to get to the global numbers. To make sure the - // global aggregated numbers line up, we must take care not to shut down the benchmark client - // before we do this, as that will increment certain counters like connections closed, etc. - const auto& counters = Utility().mapCountersFromStore( - store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); - StatisticFactoryImpl statistic_factory(options_); - collector.addResult("global", mergeWorkerStatistics(workers), counters, - total_execution_duration / workers_.size()); - return counters.find("sequencer.failed_terminations") == counters.end(); + // Note that above we use use counter values snapshotted by the workers right after its + // execution completes. Here we query the live counters to get to the global numbers. To make + // sure the global aggregated numbers line up, we must take care not to shut down the benchmark + // client before we do this, as that will increment certain counters like connections closed, + // etc. + const auto& counters = Utility().mapCountersFromStore( + store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); + StatisticFactoryImpl statistic_factory(options_); + collector.addResult("global", mergeWorkerStatistics(workers), counters, + total_execution_duration / workers_.size()); + return counters.find("sequencer.failed_terminations") == counters.end(); + } catch (Envoy::EnvoyException& ex) { + ENVOY_LOG(error, "Fatal exception: {}", ex.what()); + throw; + } } void ProcessImpl::setupForHRTimers() { From 9c89e74453724491aec1fbfe479a871545db6553 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Tue, 21 Apr 2020 18:52:24 +0200 Subject: [PATCH 2/5] Extract ProcessImpl::runInternal() Signed-off-by: Otto van der Schaaf --- source/client/process_impl.cc | 150 +++++++++++++++++----------------- source/client/process_impl.h | 3 + 2 files changed, 80 insertions(+), 73 deletions(-) diff --git a/source/client/process_impl.cc b/source/client/process_impl.cc index 3c2cb369f..bdd8a0f9b 100644 --- a/source/client/process_impl.cc +++ b/source/client/process_impl.cc @@ -377,6 +377,82 @@ void ProcessImpl::addRequestSourceCluster( socket_address->set_port_value(uri.port()); } +bool ProcessImpl::runInternal(OutputCollector& collector, const std::vector& uris, + const UriPtr& request_source_uri, const UriPtr& tracing_uri) { + int number_of_workers = determineConcurrency(); + shutdown_ = false; + const std::vector& workers = createWorkers(number_of_workers); + tls_.registerThread(*dispatcher_, true); + store_root_.initializeThreading(*dispatcher_, tls_); + runtime_singleton_ = std::make_unique( + Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( + *dispatcher_, tls_, {}, *local_info_, init_manager_, store_root_, generator_, + Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); + ssl_context_manager_ = + std::make_unique(time_system_); + cluster_manager_factory_ = std::make_unique( + admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, + dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, *local_info_, + secret_manager_, validation_context_, *api_, http_context_, grpc_context_, + access_log_manager_, *singleton_manager_); + cluster_manager_factory_->setConnectionReuseStrategy( + options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU + ? Http1PoolImpl::ConnectionReuseStrategy::LRU + : Http1PoolImpl::ConnectionReuseStrategy::MRU); + cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); + if (options_.h2UseMultipleConnections()) { + cluster_manager_factory_->enableMultiConnectionH2Pool(); + } + envoy::config::bootstrap::v3::Bootstrap bootstrap; + createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); + if (tracing_uri != nullptr) { + setupTracingImplementation(bootstrap, *tracing_uri); + addTracingCluster(bootstrap, *tracing_uri); + } + ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); + cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); + maybeCreateTracingDriver(bootstrap.tracing()); + cluster_manager_->setInitializedCb([this]() -> void { init_manager_.initialize(init_watcher_); }); + + Runtime::LoaderSingleton::get().initialize(*cluster_manager_); + + for (auto& w : workers_) { + w->start(); + } + + for (auto& w : workers_) { + w->waitForCompletion(); + } + + int i = 0; + std::chrono::nanoseconds total_execution_duration = 0ns; + for (auto& worker : workers_) { + auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); + // We don't write per-worker results if we only have a single worker, because the global + // results will be precisely the same. + if (workers_.size() > 1) { + StatisticFactoryImpl statistic_factory(options_); + collector.addResult(fmt::format("worker_{}", i), + vectorizeStatisticPtrMap(worker->statistics()), + worker->threadLocalCounterValues(), sequencer_execution_duration); + } + total_execution_duration += sequencer_execution_duration; + i++; + } + + // Note that above we use use counter values snapshotted by the workers right after its + // execution completes. Here we query the live counters to get to the global numbers. To make + // sure the global aggregated numbers line up, we must take care not to shut down the benchmark + // client before we do this, as that will increment certain counters like connections closed, + // etc. + const auto& counters = Utility().mapCountersFromStore( + store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); + StatisticFactoryImpl statistic_factory(options_); + collector.addResult("global", mergeWorkerStatistics(workers), counters, + total_execution_duration / workers_.size()); + return counters.find("sequencer.failed_terminations") == counters.end(); +} + bool ProcessImpl::run(OutputCollector& collector) { std::vector uris; UriPtr request_source_uri; @@ -412,80 +488,8 @@ bool ProcessImpl::run(OutputCollector& collector) { return false; } - int number_of_workers = determineConcurrency(); - shutdown_ = false; try { - const std::vector& workers = createWorkers(number_of_workers); - tls_.registerThread(*dispatcher_, true); - store_root_.initializeThreading(*dispatcher_, tls_); - runtime_singleton_ = std::make_unique( - Envoy::Runtime::LoaderPtr{new Envoy::Runtime::LoaderImpl( - *dispatcher_, tls_, {}, *local_info_, init_manager_, store_root_, generator_, - Envoy::ProtobufMessage::getStrictValidationVisitor(), *api_)}); - ssl_context_manager_ = - std::make_unique(time_system_); - cluster_manager_factory_ = std::make_unique( - admin_, Envoy::Runtime::LoaderSingleton::get(), store_root_, tls_, generator_, - dispatcher_->createDnsResolver({}, false), *ssl_context_manager_, *dispatcher_, - *local_info_, secret_manager_, validation_context_, *api_, http_context_, grpc_context_, - access_log_manager_, *singleton_manager_); - cluster_manager_factory_->setConnectionReuseStrategy( - options_.h1ConnectionReuseStrategy() == nighthawk::client::H1ConnectionReuseStrategy::LRU - ? Http1PoolImpl::ConnectionReuseStrategy::LRU - : Http1PoolImpl::ConnectionReuseStrategy::MRU); - cluster_manager_factory_->setPrefetchConnections(options_.prefetchConnections()); - if (options_.h2UseMultipleConnections()) { - cluster_manager_factory_->enableMultiConnectionH2Pool(); - } - envoy::config::bootstrap::v3::Bootstrap bootstrap; - createBootstrapConfiguration(bootstrap, uris, request_source_uri, number_of_workers); - if (tracing_uri != nullptr) { - setupTracingImplementation(bootstrap, *tracing_uri); - addTracingCluster(bootstrap, *tracing_uri); - } - ENVOY_LOG(debug, "Computed configuration: {}", bootstrap.DebugString()); - cluster_manager_ = cluster_manager_factory_->clusterManagerFromProto(bootstrap); - maybeCreateTracingDriver(bootstrap.tracing()); - cluster_manager_->setInitializedCb( - [this]() -> void { init_manager_.initialize(init_watcher_); }); - - Runtime::LoaderSingleton::get().initialize(*cluster_manager_); - - for (auto& w : workers_) { - w->start(); - } - - for (auto& w : workers_) { - w->waitForCompletion(); - } - - int i = 0; - std::chrono::nanoseconds total_execution_duration = 0ns; - for (auto& worker : workers_) { - auto sequencer_execution_duration = worker->phase().sequencer().executionDuration(); - // We don't write per-worker results if we only have a single worker, because the global - // results will be precisely the same. - if (workers_.size() > 1) { - StatisticFactoryImpl statistic_factory(options_); - collector.addResult(fmt::format("worker_{}", i), - vectorizeStatisticPtrMap(worker->statistics()), - worker->threadLocalCounterValues(), sequencer_execution_duration); - } - total_execution_duration += sequencer_execution_duration; - i++; - } - - // Note that above we use use counter values snapshotted by the workers right after its - // execution completes. Here we query the live counters to get to the global numbers. To make - // sure the global aggregated numbers line up, we must take care not to shut down the benchmark - // client before we do this, as that will increment certain counters like connections closed, - // etc. - const auto& counters = Utility().mapCountersFromStore( - store_root_, [](absl::string_view, uint64_t value) { return value > 0; }); - StatisticFactoryImpl statistic_factory(options_); - collector.addResult("global", mergeWorkerStatistics(workers), counters, - total_execution_duration / workers_.size()); - return counters.find("sequencer.failed_terminations") == counters.end(); + return runInternal(collector, uris, request_source_uri, tracing_uri); } catch (Envoy::EnvoyException& ex) { ENVOY_LOG(error, "Fatal exception: {}", ex.what()); throw; diff --git a/source/client/process_impl.h b/source/client/process_impl.h index c5226ef99..967ee7c59 100644 --- a/source/client/process_impl.h +++ b/source/client/process_impl.h @@ -93,6 +93,9 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable mergeWorkerStatistics(const std::vector& workers) const; void setupForHRTimers(); + bool runInternal(OutputCollector& collector, const std::vector& uris, + const UriPtr& request_source_uri, const UriPtr& tracing_uri); + Envoy::ProcessWide process_wide_; Envoy::PlatformImpl platform_impl_; Envoy::Event::TimeSystem& time_system_; From e28e1851c168a3e2db7f7d5a4b4bfc4a299afc28 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Tue, 21 Apr 2020 23:50:27 +0200 Subject: [PATCH 3/5] Clean up client_worker_test.cc Signed-off-by: Otto van der Schaaf --- test/BUILD | 1 + test/client_worker_test.cc | 14 ++++---------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/test/BUILD b/test/BUILD index 2d3294744..2c9db7a40 100644 --- a/test/BUILD +++ b/test/BUILD @@ -59,6 +59,7 @@ envoy_cc_test( "@envoy//test/mocks/local_info:local_info_mocks", "@envoy//test/mocks/protobuf:protobuf_mocks", "@envoy//test/mocks/thread_local:thread_local_mocks", + "@envoy//test/test_common:simulated_time_system_lib", ], ) diff --git a/test/client_worker_test.cc b/test/client_worker_test.cc index 21240f234..5da4167bb 100644 --- a/test/client_worker_test.cc +++ b/test/client_worker_test.cc @@ -9,6 +9,7 @@ #include "external/envoy/test/mocks/local_info/mocks.h" #include "external/envoy/test/mocks/protobuf/mocks.h" #include "external/envoy/test/mocks/thread_local/mocks.h" +#include "external/envoy/test/test_common/simulated_time_system.h" #include "common/statistic_impl.h" @@ -74,11 +75,6 @@ class ClientWorkerTest : public Test { TerminationPredicatePtr createMockTerminationPredicate() { auto predicate = std::make_unique>(); ON_CALL(*predicate, appendToChain(_)).WillByDefault(ReturnRef(*predicate)); - EXPECT_CALL(*predicate, evaluateChain()) - .Times(AtLeast(0)) - .WillOnce(Return(TerminationPredicate::Status::PROCEED)) - .WillOnce(Return(TerminationPredicate::Status::TERMINATE)); - return predicate; } @@ -91,7 +87,7 @@ class ClientWorkerTest : public Test { MockRequestSourceFactory request_generator_factory_; Envoy::Stats::IsolatedStoreImpl store_; NiceMock tls_; - Envoy::Event::TestRealTimeSystem time_system_; + Envoy::Event::SimulatedTimeSystem time_system_; MockBenchmarkClient* benchmark_client_; MockSequencer* sequencer_; MockRequestSource* request_generator_; @@ -112,8 +108,7 @@ TEST_F(ClientWorkerTest, BasicTest) { InSequence dummy; EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(false)).Times(1); EXPECT_CALL(*benchmark_client_, tryStartRequest(_)) - .Times(1) - .WillRepeatedly(Invoke(this, &ClientWorkerTest::CheckThreadChanged)); + .WillOnce(Invoke(this, &ClientWorkerTest::CheckThreadChanged)); EXPECT_CALL(*benchmark_client_, setShouldMeasureLatencies(true)).Times(1); EXPECT_CALL(*sequencer_, start).Times(1); EXPECT_CALL(*sequencer_, waitForCompletion).Times(1); @@ -124,8 +119,7 @@ TEST_F(ClientWorkerTest, BasicTest) { auto worker = std::make_unique( *api_, tls_, cluster_manager_ptr_, benchmark_client_factory_, termination_predicate_factory_, sequencer_factory_, request_generator_factory_, store_, worker_number, - time_system_.monotonicTime() + 10ms, http_tracer_, - ClientWorkerImpl::HardCodedWarmupStyle::ON); + time_system_.monotonicTime(), http_tracer_, ClientWorkerImpl::HardCodedWarmupStyle::ON); worker->start(); worker->waitForCompletion(); From b6fb6d57b011758ead9389145441e2fc954731c2 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 22 Apr 2020 13:45:18 +0200 Subject: [PATCH 4/5] CI/tsan: tweaks to avoid timing out Signed-off-by: Otto van der Schaaf --- test/integration/integration_test.py | 2 +- test/integration/test_integration_basics.py | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/test/integration/integration_test.py b/test/integration/integration_test.py index 2309521a7..951d56b32 100644 --- a/test/integration/integration_test.py +++ b/test/integration/integration_test.py @@ -24,7 +24,7 @@ "-x", path, "-n", - "2" if isSanitizerRun() else "20" # Number of tests to run in parallel + "4" if isSanitizerRun() else "20" # Number of tests to run in parallel ], plugins=["xdist"]) exit(r) diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 8a0153714..4d1d2f17b 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -554,19 +554,15 @@ def test_https_h1_sni(sni_test_server_fixture): # Verify failure when we set no host (will get plain http) parsed_json, _ = sni_test_server_fixture.runNighthawkClient( - [sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"], + [sni_test_server_fixture.getTestServerRootUri(), "--rps", "20", "--duration", "100"], expect_failure=True) # Verify success when we use plain http and don't request the sni host - parsed_json, _ = sni_test_server_fixture.runNighthawkClient( - [sni_test_server_fixture.getTestServerRootUri(), "--rps", "100", "--duration", "100"], - expect_failure=True) - parsed_json, _ = sni_test_server_fixture.runNighthawkClient([ sni_test_server_fixture.getTestServerRootUri().replace("https://", "http://"), "--rps", "100", - "--duration", "100", "--termination-predicate", "benchmark.http_2xx:2" - ], - expect_failure=False) + "--duration", "20", "--termination-predicate", "benchmark.http_2xx:2" + ], expect_failure=False) + counters = sni_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1) assertCounterGreaterEqual(counters, "upstream_cx_http1_total", 1) From a72948adc258f5736d8330f2fac7da8ba6945951 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 22 Apr 2020 14:42:44 +0200 Subject: [PATCH 5/5] Fix format Signed-off-by: Otto van der Schaaf --- test/integration/test_integration_basics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/test_integration_basics.py b/test/integration/test_integration_basics.py index 4d1d2f17b..95619bfbf 100644 --- a/test/integration/test_integration_basics.py +++ b/test/integration/test_integration_basics.py @@ -561,7 +561,8 @@ def test_https_h1_sni(sni_test_server_fixture): parsed_json, _ = sni_test_server_fixture.runNighthawkClient([ sni_test_server_fixture.getTestServerRootUri().replace("https://", "http://"), "--rps", "100", "--duration", "20", "--termination-predicate", "benchmark.http_2xx:2" - ], expect_failure=False) + ], + expect_failure=False) counters = sni_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json) assertCounterGreaterEqual(counters, "benchmark.http_2xx", 1)