From 7aad8ffead28bf9eb86712f3df8e6adef6a395cc Mon Sep 17 00:00:00 2001 From: Inseok Lee Date: Fri, 13 Sep 2024 10:23:06 +0900 Subject: [PATCH 01/11] redis: Support eval_ro, evalsha_ro --- .../network/common/redis/supported_commands.h | 3 +- .../clusters/redis/redis_cluster_lb_test.cc | 25 +++++++++++ .../redis_proxy/command_splitter_impl_test.cc | 44 +++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index e1869627a9f14..b8d9d3d6e2904 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -49,7 +49,8 @@ struct SupportedCommands { * @return commands which hash on the fourth argument */ static const absl::flat_hash_set& evalCommands() { - CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "eval", "evalsha"); + CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set, "eval", "evalsha", "eval_ro", + "evalsha_ro"); } /** diff --git a/test/extensions/clusters/redis/redis_cluster_lb_test.cc b/test/extensions/clusters/redis/redis_cluster_lb_test.cc index 3bd635f646d2a..f46f43cec953c 100644 --- a/test/extensions/clusters/redis/redis_cluster_lb_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_lb_test.cc @@ -604,6 +604,31 @@ TEST_F(RedisLoadBalancerContextImplTest, EnforceHashTag) { EXPECT_EQ(NetworkFilters::Common::Redis::Client::ReadPolicy::Primary, context2.readPolicy()); } +TEST_F(RedisLoadBalancerContextImplTest, ReadOnlyCommand) { + std::vector eval_ro_foo(4); + eval_ro_foo[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + eval_ro_foo[0].asString() = "eval_ro"; + eval_ro_foo[1].type(NetworkFilters::Common::Redis::RespType::BulkString); + eval_ro_foo[1].asString() = "return {KEYS[1]}"; + eval_ro_foo[2].type(NetworkFilters::Common::Redis::RespType::BulkString); + eval_ro_foo[2].asString() = "foo"; + eval_ro_foo[3].type(NetworkFilters::Common::Redis::RespType::BulkString); + eval_ro_foo[3].asString() = "0"; + + NetworkFilters::Common::Redis::RespValue eval_ro_request; + eval_ro_request.type(NetworkFilters::Common::Redis::RespType::Array); + eval_ro_request.asArray().swap(eval_ro_foo); + + RedisLoadBalancerContextImpl context1( + "foo", true, true, eval_ro_request, + NetworkFilters::Common::Redis::Client::ReadPolicy::PreferReplica); + + EXPECT_EQ(absl::optional(44950), context1.computeHashKey()); + EXPECT_EQ(true, context1.isReadCommand()); + EXPECT_EQ(NetworkFilters::Common::Redis::Client::ReadPolicy::PreferReplica, + context1.readPolicy()); +} + } // namespace Redis } // namespace Clusters } // namespace Extensions diff --git a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc index f18a7d7c1b78b..65dfedc6acb54 100644 --- a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc @@ -529,6 +529,50 @@ TEST_F(RedisSingleServerRequestTest, EvalShaSuccess) { store_.counter(fmt::format("redis.foo.command.{}.success", lower_command)).value()); }; +TEST_F(RedisSingleServerRequestTest, EvalRoSuccess) { + InSequence s; + + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"eval_ro", "return {ARGV[1]}", "1", "key", "arg"}); + makeRequest("key", std::move(request)); + EXPECT_NE(nullptr, handle_); + + std::string lower_command = absl::AsciiStrToLower("eval_ro"); + + time_system_.setMonotonicTime(std::chrono::milliseconds(10)); + EXPECT_CALL(store_, deliverHistogramToSinks( + Property(&Stats::Metric::name, + fmt::format("redis.foo.command.{}.latency", lower_command)), + 10)); + respond(); + + EXPECT_EQ(1UL, store_.counter(fmt::format("redis.foo.command.{}.total", lower_command)).value()); + EXPECT_EQ(1UL, + store_.counter(fmt::format("redis.foo.command.{}.success", lower_command)).value()); +}; + +TEST_F(RedisSingleServerRequestTest, EvalShaRoSuccess) { + InSequence s; + + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"EVALSHA_RO", "return {ARGV[1]}", "1", "keykey", "arg"}); + makeRequest("keykey", std::move(request)); + EXPECT_NE(nullptr, handle_); + + std::string lower_command = absl::AsciiStrToLower("evalsha_ro"); + + time_system_.setMonotonicTime(std::chrono::milliseconds(10)); + EXPECT_CALL(store_, deliverHistogramToSinks( + Property(&Stats::Metric::name, + fmt::format("redis.foo.command.{}.latency", lower_command)), + 10)); + respond(); + + EXPECT_EQ(1UL, store_.counter(fmt::format("redis.foo.command.{}.total", lower_command)).value()); + EXPECT_EQ(1UL, + store_.counter(fmt::format("redis.foo.command.{}.success", lower_command)).value()); +}; + TEST_F(RedisSingleServerRequestTest, EvalWrongNumberOfArgs) { InSequence s; From 5f269a3d5382fb320ee76104720a74887383e680 Mon Sep 17 00:00:00 2001 From: Doogie Min Date: Sat, 2 Nov 2024 02:32:31 +0900 Subject: [PATCH 02/11] use custom header for tracing Include code formatting improvements for consistent style in trace test files. --- .../opentelemetry/span_context_extractor.h | 4 +- .../tracers/opentelemetry/tracer.cc | 4 +- .../always_on_sampler_integration_test.cc | 25 +++++++----- .../dynatrace_sampler_integration_test.cc | 25 +++++++----- .../span_context_extractor_test.cc | 38 +++++++++++-------- 5 files changed, 56 insertions(+), 40 deletions(-) diff --git a/source/extensions/tracers/opentelemetry/span_context_extractor.h b/source/extensions/tracers/opentelemetry/span_context_extractor.h index dffeb6218c921..517a9700bafaf 100644 --- a/source/extensions/tracers/opentelemetry/span_context_extractor.h +++ b/source/extensions/tracers/opentelemetry/span_context_extractor.h @@ -15,8 +15,8 @@ namespace OpenTelemetry { class OpenTelemetryConstantValues { public: - const Tracing::TraceContextHandler TRACE_PARENT{"traceparent"}; - const Tracing::TraceContextHandler TRACE_STATE{"tracestate"}; + const Tracing::TraceContextHandler TRACE_PARENT{"x-sendbird-traceparent"}; + const Tracing::TraceContextHandler TRACE_STATE{"x-sendbird-tracestate"}; }; using OpenTelemetryConstants = ConstSingleton; diff --git a/source/extensions/tracers/opentelemetry/tracer.cc b/source/extensions/tracers/opentelemetry/tracer.cc index 9dd7e9a05dd40..6158ee66819e6 100644 --- a/source/extensions/tracers/opentelemetry/tracer.cc +++ b/source/extensions/tracers/opentelemetry/tracer.cc @@ -27,11 +27,11 @@ using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; namespace { const Tracing::TraceContextHandler& traceParentHeader() { - CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent"); + CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "x-sendbird-traceparent"); } const Tracing::TraceContextHandler& traceStateHeader() { - CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "tracestate"); + CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "x-sendbird-tracestate"); } void callSampler(SamplerSharedPtr sampler, const StreamInfo::StreamInfo& stream_info, diff --git a/test/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler_integration_test.cc b/test/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler_integration_test.cc index 0fe665b9c9c80..5f5efbeddf1bd 100644 --- a/test/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler_integration_test.cc +++ b/test/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler_integration_test.cc @@ -59,9 +59,12 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, AlwaysOnSamplerIntegrationTest, // Sends a request with traceparent and tracestate header. TEST_P(AlwaysOnSamplerIntegrationTest, TestWithTraceparentAndTracestate) { - Http::TestRequestHeaderMapImpl request_headers{ - {":method", "GET"}, {":path", "/test/long/url"}, {":scheme", "http"}, - {":authority", "host"}, {"tracestate", "key=value"}, {"traceparent", TRACEPARENT_VALUE}}; + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-sendbird-tracestate", "key=value"}, + {"x-sendbird-traceparent", TRACEPARENT_VALUE}}; auto response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); @@ -71,14 +74,14 @@ TEST_P(AlwaysOnSamplerIntegrationTest, TestWithTraceparentAndTracestate) { // traceparent should be set: traceid should be re-used, span id should be different absl::string_view traceparent_value = upstream_request_->headers() - .get(Http::LowerCaseString("traceparent"))[0] + .get(Http::LowerCaseString("x-sendbird-traceparent"))[0] ->value() .getStringView(); EXPECT_TRUE(absl::StartsWith(traceparent_value, TRACEPARENT_VALUE_START)); EXPECT_NE(TRACEPARENT_VALUE, traceparent_value); // tracestate should be forwarded absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); EXPECT_EQ("key=value", tracestate_value); @@ -90,7 +93,7 @@ TEST_P(AlwaysOnSamplerIntegrationTest, TestWithTraceparentOnly) { {":path", "/test/long/url"}, {":scheme", "http"}, {":authority", "host"}, - {"traceparent", TRACEPARENT_VALUE}}; + {"x-sendbird-traceparent", TRACEPARENT_VALUE}}; auto response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); ASSERT_TRUE(response->waitForEndStream()); @@ -99,14 +102,14 @@ TEST_P(AlwaysOnSamplerIntegrationTest, TestWithTraceparentOnly) { // traceparent should be set: traceid should be re-used, span id should be different absl::string_view traceparent_value = upstream_request_->headers() - .get(Http::LowerCaseString("traceparent"))[0] + .get(Http::LowerCaseString("x-sendbird-traceparent"))[0] ->value() .getStringView(); EXPECT_TRUE(absl::StartsWith(traceparent_value, TRACEPARENT_VALUE_START)); EXPECT_NE(TRACEPARENT_VALUE, traceparent_value); // OTLP tracer adds an empty tracestate absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); EXPECT_EQ("", tracestate_value); @@ -125,11 +128,13 @@ TEST_P(AlwaysOnSamplerIntegrationTest, TestWithoutTraceparentAndTracestate) { // traceparent will be added, trace_id and span_id will be generated, so there is nothing we can // assert - EXPECT_EQ(upstream_request_->headers().get(::Envoy::Http::LowerCaseString("traceparent")).size(), + EXPECT_EQ(upstream_request_->headers() + .get(::Envoy::Http::LowerCaseString("x-sendbird-traceparent")) + .size(), 1); // OTLP tracer adds an empty tracestate absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); EXPECT_EQ("", tracestate_value); diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc index a3887eaf5ab4e..c2dee1f01ffad 100644 --- a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc @@ -61,9 +61,12 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, DynatraceSamplerIntegrationTest, // Sends a request with traceparent and tracestate header. TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentAndTracestate) { // tracestate does not contain a Dynatrace tag - Http::TestRequestHeaderMapImpl request_headers{ - {":method", "GET"}, {":path", "/test/long/url"}, {":scheme", "http"}, - {":authority", "host"}, {"tracestate", "key=value"}, {"traceparent", TRACEPARENT_VALUE}}; + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-sendbird-tracestate", "key=value"}, + {"x-sendbird-traceparent", TRACEPARENT_VALUE}}; auto response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); @@ -73,14 +76,14 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentAndTracestate) { // traceparent should be set: traceid should be re-used, span id should be different absl::string_view traceparent_value = upstream_request_->headers() - .get(Http::LowerCaseString("traceparent"))[0] + .get(Http::LowerCaseString("x-sendbird-traceparent"))[0] ->value() .getStringView(); EXPECT_TRUE(absl::StartsWith(traceparent_value, TRACEPARENT_VALUE_START)); EXPECT_NE(TRACEPARENT_VALUE, traceparent_value); // Dynatrace tracestate should be added to existing tracestate absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); // use StartsWith because path-info (last element in trace state) contains a random value @@ -96,7 +99,7 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentOnly) { {":path", "/test/long/url"}, {":scheme", "http"}, {":authority", "host"}, - {"traceparent", TRACEPARENT_VALUE}}; + {"x-sendbird-traceparent", TRACEPARENT_VALUE}}; auto response = sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); ASSERT_TRUE(response->waitForEndStream()); @@ -105,14 +108,14 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentOnly) { // traceparent should be set: traceid should be re-used, span id should be different absl::string_view traceparent_value = upstream_request_->headers() - .get(Http::LowerCaseString("traceparent"))[0] + .get(Http::LowerCaseString("x-sendbird-traceparent"))[0] ->value() .getStringView(); EXPECT_TRUE(absl::StartsWith(traceparent_value, TRACEPARENT_VALUE_START)); EXPECT_NE(TRACEPARENT_VALUE, traceparent_value); // Dynatrace tag should be added to tracestate absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); // use StartsWith because path-info (last element in trace state contains a random value) @@ -133,11 +136,13 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithoutTraceparentAndTracestate) { // traceparent will be added, trace_id and span_id will be generated, so there is nothing we can // assert - EXPECT_EQ(upstream_request_->headers().get(::Envoy::Http::LowerCaseString("traceparent")).size(), + EXPECT_EQ(upstream_request_->headers() + .get(::Envoy::Http::LowerCaseString("x-sendbird-traceparent")) + .size(), 1); // Dynatrace tag should be added to tracestate absl::string_view tracestate_value = upstream_request_->headers() - .get(Http::LowerCaseString("tracestate"))[0] + .get(Http::LowerCaseString("x-sendbird-tracestate"))[0] ->value() .getStringView(); EXPECT_TRUE(absl::StartsWith(tracestate_value, "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;")) diff --git a/test/extensions/tracers/opentelemetry/span_context_extractor_test.cc b/test/extensions/tracers/opentelemetry/span_context_extractor_test.cc index b87f984768ebe..ae13d846c7f8c 100644 --- a/test/extensions/tracers/opentelemetry/span_context_extractor_test.cc +++ b/test/extensions/tracers/opentelemetry/span_context_extractor_test.cc @@ -23,7 +23,8 @@ constexpr absl::string_view trace_flags = "01"; TEST(SpanContextExtractorTest, ExtractSpanContext) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; + {"x-sendbird-traceparent", + fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -38,7 +39,7 @@ TEST(SpanContextExtractorTest, ExtractSpanContext) { TEST(SpanContextExtractorTest, ExtractSpanContextNotSampled) { const std::string trace_flags_unsampled{"00"}; Tracing::TestTraceContextImpl request_headers{ - {"traceparent", + {"x-sendbird-traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags_unsampled)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -62,7 +63,8 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithoutHeader) { TEST(SpanContextExtractorTest, ThrowsExceptionWithTooLongHeader) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("000{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; + {"x-sendbird-traceparent", + fmt::format("000{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -73,7 +75,7 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithTooLongHeader) { TEST(SpanContextExtractorTest, ThrowsExceptionWithTooShortHeader) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("{}-{}-{}", trace_id, parent_id, trace_flags)}}; + {"x-sendbird-traceparent", fmt::format("{}-{}-{}", trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -84,7 +86,8 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithTooShortHeader) { TEST(SpanContextExtractorTest, ThrowsExceptionWithInvalidHyphenation) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("{}{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; + {"x-sendbird-traceparent", + fmt::format("{}{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -97,7 +100,7 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithInvalidSizes) { const std::string invalid_version{"0"}; const std::string invalid_trace_flags{"001"}; Tracing::TestTraceContextImpl request_headers{ - {"traceparent", + {"x-sendbird-traceparent", fmt::format("{}-{}-{}-{}", invalid_version, trace_id, parent_id, invalid_trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); @@ -110,7 +113,7 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithInvalidSizes) { TEST(SpanContextExtractorTest, ThrowsExceptionWithInvalidHex) { const std::string invalid_version{"ZZ"}; Tracing::TestTraceContextImpl request_headers{ - {"traceparent", + {"x-sendbird-traceparent", fmt::format("{}-{}-{}-{}", invalid_version, trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); @@ -123,7 +126,7 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithInvalidHex) { TEST(SpanContextExtractorTest, ThrowsExceptionWithAllZeroTraceId) { const std::string invalid_trace_id{"00000000000000000000000000000000"}; Tracing::TestTraceContextImpl request_headers{ - {"traceparent", + {"x-sendbird-traceparent", fmt::format("{}-{}-{}-{}", version, invalid_trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); @@ -136,7 +139,7 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithAllZeroTraceId) { TEST(SpanContextExtractorTest, ThrowsExceptionWithAllZeroParentId) { const std::string invalid_parent_id{"0000000000000000"}; Tracing::TestTraceContextImpl request_headers{ - {"traceparent", + {"x-sendbird-traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, invalid_parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); @@ -148,7 +151,8 @@ TEST(SpanContextExtractorTest, ThrowsExceptionWithAllZeroParentId) { TEST(SpanContextExtractorTest, ExtractSpanContextWithEmptyTracestate) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; + {"x-sendbird-traceparent", + fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -158,8 +162,9 @@ TEST(SpanContextExtractorTest, ExtractSpanContextWithEmptyTracestate) { TEST(SpanContextExtractorTest, ExtractSpanContextWithTracestate) { Tracing::TestTraceContextImpl request_headers{ - {"traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}, - {"tracestate", "sample-tracestate"}}; + {"x-sendbird-traceparent", + fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}, + {"x-sendbird-tracestate", "sample-tracestate"}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -168,7 +173,7 @@ TEST(SpanContextExtractorTest, ExtractSpanContextWithTracestate) { } TEST(SpanContextExtractorTest, IgnoreTracestateWithoutTraceparent) { - Tracing::TestTraceContextImpl request_headers{{"tracestate", "sample-tracestate"}}; + Tracing::TestTraceContextImpl request_headers{{"x-sendbird-tracestate", "sample-tracestate"}}; SpanContextExtractor span_context_extractor(request_headers); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); @@ -178,9 +183,10 @@ TEST(SpanContextExtractorTest, IgnoreTracestateWithoutTraceparent) { TEST(SpanContextExtractorTest, ExtractSpanContextWithMultipleTracestateEntries) { Http::TestRequestHeaderMapImpl request_headers{ - {"traceparent", fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}, - {"tracestate", "sample-tracestate"}, - {"tracestate", "sample-tracestate-2"}}; + {"x-sendbird-traceparent", + fmt::format("{}-{}-{}-{}", version, trace_id, parent_id, trace_flags)}, + {"x-sendbird-tracestate", "sample-tracestate"}, + {"x-sendbird-tracestate", "sample-tracestate-2"}}; Tracing::HttpTraceContext trace_context(request_headers); SpanContextExtractor span_context_extractor(trace_context); absl::StatusOr span_context = span_context_extractor.extractSpanContext(); From 5a4945cdecb97cfc49a13bec3bb71d9eee9afbbf Mon Sep 17 00:00:00 2001 From: Gavin Jeong Date: Wed, 30 Jul 2025 21:49:48 +0900 Subject: [PATCH 03/11] redis: fix segfault at cluster removing Signed-off-by: Chanhun Jeong --- .../clusters/redis/redis_cluster.cc | 62 +++++++++++++++---- .../extensions/clusters/redis/redis_cluster.h | 6 +- test/extensions/clusters/redis/BUILD | 1 + .../clusters/redis/redis_cluster_test.cc | 39 ++++++++++++ 4 files changed, 94 insertions(+), 14 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 7be1cbb6ea5b0..acb3f3fdb8d31 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -69,16 +69,13 @@ RedisCluster::RedisCluster( info(), context.serverFactoryContext().api())), auth_password_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword( info(), context.serverFactoryContext().api())), - cluster_name_(cluster.name()), refresh_manager_(Common::Redis::getClusterRefreshManager( - context.serverFactoryContext().singletonManager(), - context.serverFactoryContext().mainThreadDispatcher(), - context.serverFactoryContext().clusterManager(), - context.serverFactoryContext().api().timeSource())), - registration_handle_(refresh_manager_->registerCluster( - cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_, - failure_refresh_threshold_, host_degraded_refresh_threshold_, [&]() { - redis_discovery_session_->resolve_timer_->enableTimer(std::chrono::milliseconds(0)); - })) { + cluster_name_(cluster.name()), + refresh_manager_(Common::Redis::getClusterRefreshManager( + context.serverFactoryContext().singletonManager(), + context.serverFactoryContext().mainThreadDispatcher(), + context.serverFactoryContext().clusterManager(), + context.serverFactoryContext().api().timeSource())), + registration_handle_(nullptr) { const auto& locality_lb_endpoints = load_assignment_.endpoints(); for (const auto& locality_lb_endpoint : locality_lb_endpoints) { for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { @@ -87,6 +84,33 @@ RedisCluster::RedisCluster( *this, host.socket_address().address(), host.socket_address().port_value())); } } + + // Register the cluster callback using weak_ptr to avoid use-after-free + std::weak_ptr weak_session = redis_discovery_session_; + registration_handle_ = refresh_manager_->registerCluster( + cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_, + failure_refresh_threshold_, host_degraded_refresh_threshold_, + [weak_session]() { + // Try to lock the weak pointer to ensure the session is still alive + auto session = weak_session.lock(); + if (session && session->resolve_timer_) { + session->resolve_timer_->enableTimer(std::chrono::milliseconds(0)); + } + }); +} + +RedisCluster::~RedisCluster() { + // Set flag to prevent any callbacks from executing during destruction + is_destroying_.store(true); + + // Reset redis_discovery_session_ before other members are destroyed + // to ensure any pending callbacks from refresh_manager_ don't access it. + // This matches the approach in PR #39625. + redis_discovery_session_.reset(); + + // Also clear DNS discovery targets to prevent their callbacks from + // accessing the destroyed cluster. + dns_discovery_resolve_targets_.clear(); } void RedisCluster::startPreInit() { @@ -198,7 +222,7 @@ RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() { active_query_->cancel(Network::ActiveDnsQuery::CancelReason::QueryAbandoned); } // Disable timer for mock tests. - if (resolve_timer_) { + if (resolve_timer_ && resolve_timer_->enabled()) { resolve_timer_->disableTimer(); } } @@ -221,7 +245,13 @@ void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { if (!resolve_timer_) { resolve_timer_ = - parent_.dispatcher_.createTimer([this]() -> void { startResolveDns(); }); + parent_.dispatcher_.createTimer([this]() -> void { + // Check if the parent cluster is being destroyed + if (parent_.is_destroying_.load()) { + return; + } + startResolveDns(); + }); } // if the initial dns resolved to empty, we'll skip the redis discovery phase and // treat it as an empty cluster. @@ -244,7 +274,13 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( Envoy::Extensions::Clusters::Redis::RedisCluster& parent, NetworkFilters::Common::Redis::Client::ClientFactory& client_factory) : parent_(parent), dispatcher_(parent.dispatcher_), - resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })), + resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { + // Check if the parent cluster is being destroyed + if (parent_.is_destroying_.load()) { + return; + } + startResolveRedis(); + })), client_factory_(client_factory), buffer_timeout_(0), redis_command_stats_( NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 2aa97218e5501..1b12f347ddf88 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -90,6 +90,7 @@ namespace Redis { class RedisCluster : public Upstream::BaseDynamicClusterImpl { public: + ~RedisCluster(); static absl::StatusOr> create(const envoy::config::cluster::v3::Cluster& cluster, const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster, @@ -302,7 +303,10 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { const std::string auth_password_; const std::string cluster_name_; const Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_; - const Common::Redis::ClusterRefreshManager::HandlePtr registration_handle_; + Common::Redis::ClusterRefreshManager::HandlePtr registration_handle_; + + // Flag to prevent callbacks during destruction + std::atomic is_destroying_{false}; }; class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase< diff --git a/test/extensions/clusters/redis/BUILD b/test/extensions/clusters/redis/BUILD index 6f0a607172659..32dfc5229938f 100644 --- a/test/extensions/clusters/redis/BUILD +++ b/test/extensions/clusters/redis/BUILD @@ -31,6 +31,7 @@ envoy_extension_cc_test( "//source/server:transport_socket_config_lib", "//test/common/upstream:utility_lib", "//test/extensions/clusters/redis:redis_cluster_mocks", + "//test/extensions/common/redis:mocks_lib", "//test/extensions/filters/network/common/redis:redis_mocks", "//test/extensions/filters/network/common/redis:test_utils_lib", "//test/extensions/filters/network/redis_proxy:redis_mocks", diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index ae331cf2fa171..41332b870cc20 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -18,6 +18,7 @@ #include "test/common/upstream/utility.h" #include "test/extensions/clusters/redis/mocks.h" +#include "test/extensions/common/redis/mocks.h" #include "test/extensions/filters/network/common/redis/mocks.h" #include "test/mocks/common.h" #include "test/mocks/protobuf/mocks.h" @@ -1487,6 +1488,44 @@ TEST_F(RedisClusterTest, HostRemovalAfterHcFail) { */ } +// Test that verifies cluster destruction does not cause segfault when refresh manager +// triggers callback after cluster is destroyed. This reproduces the issue from #38585. +TEST_F(RedisClusterTest, NoSegfaultOnClusterDestructionWithPendingCallback) { + // This test verifies that destroying the cluster properly cleans up resources + // and doesn't cause a segfault. The key protection is in the destructor that + // sets is_destroying_ flag and cleans up the redis_discovery_session_. + + // Create the cluster with basic configuration + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + expectRedisResolve(true); + + cluster_->initialize([&]() { + initialized_.ready(); + return absl::OkStatus(); + }); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + std::bitset single_slot_primary(0xfff); + std::bitset no_replica(0); + expectClusterSlotResponse(createResponse(single_slot_primary, no_replica)); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + + // Now destroy the cluster. With the fix in place (destructor setting is_destroying_ + // and resetting redis_discovery_session_), this should not crash. + // Without the fix, accessing resolve_timer_ after destruction would segfault. + cluster_.reset(); + + // If we reach here without crashing, the test passes. + // The fix ensures that: + // 1. The destructor sets is_destroying_ = true + // 2. The destructor resets redis_discovery_session_ + // 3. Timer callbacks check is_destroying_ before accessing cluster members +} + } // namespace Redis } // namespace Clusters } // namespace Extensions From 28207b2d3a5794ec10b7aa4cf7665b6b9bbd51f9 Mon Sep 17 00:00:00 2001 From: Gavin Jeong Date: Wed, 3 Sep 2025 01:17:19 +0900 Subject: [PATCH 04/11] Add QUIC Keylog Support with SSLKEYLOGFILE and TLS Context Integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces QUIC/HTTP3 keylog functionality in Envoy, enabling generation of NSS Key Log Format files for Wireshark and other debugging tools. - Keylog callback registration in OnNewSslCtx() - Implementation of EnvoyQuicProofSource::setupQuicKeylogCallback() and quicKeylogCallback() - TLS context–based keylog configuration with per–filter chain caching and thread safety - Address filtering via local/remote IP lists - Fallback to SSLKEYLOGFILE environment variable for compatibility with existing workflows - QuicKeylogBridge integration with Envoy’s existing TLS keylog infrastructure - RawBufferSocket fallback fix in QuicServerTransportSocketFactory::createDownstreamTransportSocket() - Comprehensive unit tests including edge cases Signed-off-by: Chanhun Jeong --- source/common/quic/BUILD | 1 + source/common/quic/envoy_quic_proof_source.cc | 177 +++++++++++++++++- source/common/quic/envoy_quic_proof_source.h | 48 ++++- .../quic_server_transport_socket_factory.h | 10 +- .../quic/envoy_quic_proof_source_test.cc | 176 +++++++++++++++++ 5 files changed, 407 insertions(+), 5 deletions(-) diff --git a/source/common/quic/BUILD b/source/common/quic/BUILD index c44d4a489aa80..8c1a4b54eae07 100644 --- a/source/common/quic/BUILD +++ b/source/common/quic/BUILD @@ -512,6 +512,7 @@ envoy_cc_library( "//envoy/server:transport_socket_config_interface", "//envoy/ssl:context_config_interface", "//source/common/common:assert_lib", + "//source/common/network:raw_buffer_socket_lib", "//source/common/network:transport_socket_options_lib", "//source/common/tls:server_context_config_lib", "//source/common/tls:server_context_lib", diff --git a/source/common/quic/envoy_quic_proof_source.cc b/source/common/quic/envoy_quic_proof_source.cc index 04be05c68f311..bb166c52e7a99 100644 --- a/source/common/quic/envoy_quic_proof_source.cc +++ b/source/common/quic/envoy_quic_proof_source.cc @@ -2,6 +2,9 @@ #include +#include +#include + #include "envoy/ssl/tls_certificate_config.h" #include "source/common/quic/cert_compression.h" @@ -9,6 +12,8 @@ #include "source/common/quic/quic_io_handle_wrapper.h" #include "source/common/runtime/runtime_features.h" #include "source/common/stream_info/stream_info_impl.h" +#include "source/common/tls/context_config_impl.h" +#include "source/common/network/utility.h" #include "openssl/bytestring.h" #include "quiche/quic/core/crypto/certificate_view.h" @@ -29,7 +34,7 @@ EnvoyQuicProofSource::GetCertChain(const quic::QuicSocketAddress& server_address return nullptr; } - return getTlsCertAndFilterChain(*res, hostname, cert_matched_sni).cert_; + return getTlsCertAndFilterChain(*res, hostname, cert_matched_sni, server_address, client_address).cert_; } void EnvoyQuicProofSource::signPayload( @@ -44,7 +49,7 @@ void EnvoyQuicProofSource::signPayload( } CertWithFilterChain res = - getTlsCertAndFilterChain(*data, hostname, nullptr /* cert_matched_sni */); + getTlsCertAndFilterChain(*data, hostname, nullptr /* cert_matched_sni */, server_address, client_address); if (res.private_key_ == nullptr) { ENVOY_LOG(warn, "No matching filter chain found for handshake."); callback->Run(false, "", nullptr); @@ -74,13 +79,26 @@ void EnvoyQuicProofSource::signPayload( EnvoyQuicProofSource::CertWithFilterChain EnvoyQuicProofSource::getTlsCertAndFilterChain(const TransportSocketFactoryWithFilterChain& data, const std::string& hostname, - bool* cert_matched_sni) { + bool* cert_matched_sni, + const quic::QuicSocketAddress& server_address, + const quic::QuicSocketAddress& client_address) { auto [cert, key] = data.transport_socket_factory_.getTlsCertificateAndKey(hostname, cert_matched_sni); if (cert == nullptr || key == nullptr) { ENVOY_LOG(warn, "No certificate is configured in transport socket config."); return {}; } + + // Cache the keylog configuration and connection info for this filter chain + try { + const auto& context_config = data.transport_socket_factory_.getContextConfig(); + storeKeylogInfo(data.filter_chain_, + std::shared_ptr(&context_config, [](const Ssl::ContextConfig*){}), + server_address, client_address); + } catch (const std::exception& e) { + ENVOY_LOG(debug, "Failed to cache keylog info for filter chain: {}", e.what()); + } + return {std::move(cert), std::move(key), data.filter_chain_}; } @@ -117,6 +135,159 @@ void EnvoyQuicProofSource::updateFilterChainManager( void EnvoyQuicProofSource::OnNewSslCtx(SSL_CTX* ssl_ctx) { CertCompression::registerSslContext(ssl_ctx); + + // Try to set up keylog callback for QUIC SSL contexts + setupQuicKeylogCallback(ssl_ctx); +} + +void EnvoyQuicProofSource::setupQuicKeylogCallback(SSL_CTX* ssl_ctx) { + // Store reference to this proof source in SSL_CTX for use in keylog callback + SSL_CTX_set_app_data(ssl_ctx, this); + + // Set up the keylog callback - the actual keylog configuration will be + // determined per-connection in the callback based on the filter chain + SSL_CTX_set_keylog_callback(ssl_ctx, quicKeylogCallback); +} + +// Helper function to convert Envoy address to QUICHE address +quic::QuicSocketAddress envoyAddressToQuicAddress(const Network::Address::Instance& envoy_addr) { + if (envoy_addr.type() == Network::Address::Type::Ip) { + const auto& ip_addr = *envoy_addr.ip(); + quiche::QuicheIpAddress quiche_addr; + if (quiche_addr.FromString(ip_addr.addressAsString())) { + return quic::QuicSocketAddress(quic::QuicIpAddress(quiche_addr), ip_addr.port()); + } + } + // Return any address for non-IP addresses + return quic::QuicSocketAddress(); +} + +// Static keylog callback for QUIC SSL contexts +void EnvoyQuicProofSource::quicKeylogCallback(const SSL* ssl, const char* line) { + ASSERT(ssl != nullptr); + + // Get the proof source instance from SSL_CTX + auto* proof_source = + static_cast(SSL_CTX_get_app_data(SSL_get_SSL_CTX(ssl))); + ASSERT(proof_source != nullptr); + + ENVOY_LOG(debug, "QUIC keylog callback invoked for line: {}", line); + + // Try to find keylog configuration from cached filter chain information + // We iterate through all cached filter chains to find one with keylog configuration + bool keylog_written = false; + { + absl::MutexLock lock(&proof_source->keylog_cache_mutex_); + for (const auto& entry : proof_source->keylog_config_cache_) { + const auto& keylog_info = entry.second; + if (keylog_info.config) { + try { + // Convert QUIC addresses back to Envoy addresses for the bridge + std::string server_addr_str = absl::StrCat( + keylog_info.server_address.host().ToString(), ":", + keylog_info.server_address.port()); + std::string client_addr_str = absl::StrCat( + keylog_info.client_address.host().ToString(), ":", + keylog_info.client_address.port()); + + Network::Address::InstanceConstSharedPtr local_addr = + Network::Utility::parseInternetAddressAndPortNoThrow(server_addr_str); + Network::Address::InstanceConstSharedPtr remote_addr = + Network::Utility::parseInternetAddressAndPortNoThrow(client_addr_str); + + if (local_addr && remote_addr) { + QuicKeylogBridge::writeKeylog(*keylog_info.config, *local_addr, *remote_addr, line); + keylog_written = true; + ENVOY_LOG(debug, "QUIC keylog written using cached configuration"); + break; // Successfully handled by built-in system + } + } catch (const std::exception& e) { + ENVOY_LOG(debug, "Failed to write keylog using cached config: {}", e.what()); + } + } + } + } + + if (keylog_written) { + return; + } + + // Fallback: Use environment variable for backward compatibility + const char* keylog_path = std::getenv("SSLKEYLOGFILE"); + if (keylog_path != nullptr) { + std::ofstream keylog_file(keylog_path, std::ios::app); + if (keylog_file.is_open()) { + keylog_file << line << "\n"; + keylog_file.close(); + ENVOY_LOG(debug, "QUIC keylog written to {}: {}", keylog_path, line); + } + } +} + +void EnvoyQuicProofSource::QuicKeylogBridge::writeKeylog( + const Ssl::ContextConfig& config, + const Network::Address::Instance& local_addr, + const Network::Address::Instance& remote_addr, + const char* line) { + + const std::string& keylog_path = config.tlsKeyLogPath(); + if (keylog_path.empty()) { + return; + } + + // Check address filtering + const auto& local_ip_list = config.tlsKeyLogLocal(); + const auto& remote_ip_list = config.tlsKeyLogRemote(); + + bool local_match = (local_ip_list.getIpListSize() == 0 || local_ip_list.contains(local_addr)); + bool remote_match = (remote_ip_list.getIpListSize() == 0 || remote_ip_list.contains(remote_addr)); + + if (!local_match || !remote_match) { + ENVOY_LOG(debug, "QUIC keylog filtered out by address match (local={}, remote={})", + local_match, remote_match); + return; + } + + // Use access log manager to write keylog + try { + auto& access_log_manager = config.accessLogManager(); + auto file_or_error = access_log_manager.createAccessLog( + Filesystem::FilePathAndType{Filesystem::DestinationType::File, keylog_path}); + + if (file_or_error.ok()) { + auto keylog_file = file_or_error.value(); + keylog_file->write(absl::StrCat(line, "\n")); + ENVOY_LOG(debug, "QUIC keylog written via bridge to {}: {}", keylog_path, line); + } else { + ENVOY_LOG(warn, "Failed to create keylog file {}: {}", keylog_path, + file_or_error.status().message()); + } + } catch (const std::exception& e) { + ENVOY_LOG(warn, "Failed to write QUIC keylog: {}", e.what()); + } +} + +// Get SSL socket index for storing transport socket callbacks +int EnvoyQuicProofSource::sslSocketIndex() { + static int ssl_socket_index = SSL_get_ex_new_index(0, nullptr, nullptr, nullptr, nullptr); + return ssl_socket_index; +} + +void EnvoyQuicProofSource::storeKeylogInfo(const Network::FilterChain& filter_chain, + std::shared_ptr config, + const quic::QuicSocketAddress& server_address, + const quic::QuicSocketAddress& client_address) const { + absl::MutexLock lock(&keylog_cache_mutex_); + keylog_config_cache_[&filter_chain] = KeylogInfo{std::move(config), server_address, client_address}; +} + +absl::optional EnvoyQuicProofSource::getKeylogInfo(const Network::FilterChain& filter_chain) const { + absl::MutexLock lock(&keylog_cache_mutex_); + auto it = keylog_config_cache_.find(&filter_chain); + if (it != keylog_config_cache_.end()) { + return it->second; + } + return absl::nullopt; } } // namespace Quic diff --git a/source/common/quic/envoy_quic_proof_source.h b/source/common/quic/envoy_quic_proof_source.h index 6a9bb62ee255f..a521953c13682 100644 --- a/source/common/quic/envoy_quic_proof_source.h +++ b/source/common/quic/envoy_quic_proof_source.h @@ -1,15 +1,30 @@ #pragma once +#include + +#include "envoy/ssl/context_config.h" + +#include "source/common/common/thread.h" #include "source/common/quic/envoy_quic_proof_source_base.h" #include "source/common/quic/quic_server_transport_socket_factory.h" #include "source/server/listener_stats.h" +#include "absl/synchronization/mutex.h" +#include "absl/types/optional.h" +#include "quiche/quic/platform/api/quic_socket_address.h" + namespace Envoy { namespace Quic { // A ProofSource implementation which supplies a proof instance with certs from filter chain. class EnvoyQuicProofSource : public EnvoyQuicProofSourceBase { public: + // Cache for keylog configurations by filter chain + struct KeylogInfo { + std::shared_ptr config; + quic::QuicSocketAddress server_address; + quic::QuicSocketAddress client_address; + }; EnvoyQuicProofSource(Network::Socket& listen_socket, Network::FilterChainManager& filter_chain_manager, Server::ListenerStats& listener_stats, TimeSource& time_source) @@ -27,6 +42,15 @@ class EnvoyQuicProofSource : public EnvoyQuicProofSourceBase { void updateFilterChainManager(Network::FilterChainManager& filter_chain_manager); + // Bridge interface for QUIC-TLS keylog integration + class QuicKeylogBridge { + public: + static void writeKeylog(const Ssl::ContextConfig& config, + const Network::Address::Instance& local_addr, + const Network::Address::Instance& remote_addr, + const char* line); + }; + protected: // quic::ProofSource void signPayload(const quic::QuicSocketAddress& server_address, @@ -47,17 +71,39 @@ class EnvoyQuicProofSource : public EnvoyQuicProofSourceBase { }; CertWithFilterChain getTlsCertAndFilterChain(const TransportSocketFactoryWithFilterChain& data, - const std::string& hostname, bool* cert_matched_sni); + const std::string& hostname, bool* cert_matched_sni, + const quic::QuicSocketAddress& server_address, + const quic::QuicSocketAddress& client_address); absl::optional getTransportSocketAndFilterChain(const quic::QuicSocketAddress& server_address, const quic::QuicSocketAddress& client_address, const std::string& hostname); + void setupQuicKeylogCallback(SSL_CTX* ssl_ctx); + + // Static callback function for QUIC keylog + static void quicKeylogCallback(const SSL* ssl, const char* line); + + // Get SSL socket index for storing transport socket callbacks + static int sslSocketIndex(); + + // Store keylog configuration and connection info for a filter chain + void storeKeylogInfo(const Network::FilterChain& filter_chain, + std::shared_ptr config, + const quic::QuicSocketAddress& server_address, + const quic::QuicSocketAddress& client_address) const; + + // Get cached keylog information for a filter chain + absl::optional getKeylogInfo(const Network::FilterChain& filter_chain) const; + Network::Socket& listen_socket_; Network::FilterChainManager* filter_chain_manager_{nullptr}; Server::ListenerStats& listener_stats_; TimeSource& time_source_; + + mutable absl::Mutex keylog_cache_mutex_; + mutable std::unordered_map keylog_config_cache_ ABSL_GUARDED_BY(keylog_cache_mutex_); }; } // namespace Quic diff --git a/source/common/quic/quic_server_transport_socket_factory.h b/source/common/quic/quic_server_transport_socket_factory.h index 85aaf45a7e5d5..48f5b365e09f9 100644 --- a/source/common/quic/quic_server_transport_socket_factory.h +++ b/source/common/quic/quic_server_transport_socket_factory.h @@ -7,6 +7,7 @@ #include "envoy/ssl/handshaker.h" #include "source/common/common/assert.h" +#include "source/common/network/raw_buffer_socket.h" #include "source/common/network/transport_socket_options_impl.h" #include "source/common/quic/quic_transport_socket_factory.h" #include "source/common/tls/server_ssl_socket.h" @@ -25,8 +26,12 @@ class QuicServerTransportSocketFactory : public Network::DownstreamTransportSock ~QuicServerTransportSocketFactory() override; // Network::DownstreamTransportSocketFactory + // QUIC uses a different transport socket mechanism, but some code paths may call this + // Return a raw buffer socket as a safe fallback Network::TransportSocketPtr createDownstreamTransportSocket() const override { - PANIC("not implemented"); + ENVOY_LOG(warn, "createDownstreamTransportSocket called on QUIC transport socket factory. " + "This should not happen in normal QUIC operation."); + return std::make_unique(); } bool implementsSecureTransport() const override { return true; } @@ -38,6 +43,9 @@ class QuicServerTransportSocketFactory : public Network::DownstreamTransportSock bool earlyDataEnabled() const { return enable_early_data_; } + // Access the TLS context configuration (for keylog integration) + const Ssl::ServerContextConfig& getContextConfig() const { return *config_; } + protected: QuicServerTransportSocketFactory(bool enable_early_data, Stats::Scope& store, Ssl::ServerContextConfigPtr config, diff --git a/test/common/quic/envoy_quic_proof_source_test.cc b/test/common/quic/envoy_quic_proof_source_test.cc index e3b0b9e831a19..a3107036f830a 100644 --- a/test/common/quic/envoy_quic_proof_source_test.cc +++ b/test/common/quic/envoy_quic_proof_source_test.cc @@ -1,3 +1,7 @@ +#include + +#include +#include #include #include #include @@ -13,6 +17,7 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/server/server_factory_context.h" #include "test/mocks/ssl/mocks.h" +#include "test/test_common/network_utility.h" #include "test/test_common/test_runtime.h" #include "gmock/gmock.h" @@ -347,5 +352,176 @@ TEST_F(EnvoyQuicProofSourceTest, ComputeSignatureFailNoFilterChain) { std::make_unique(false, filter_chain_, signature)); } +// Test keylog functionality +TEST_F(EnvoyQuicProofSourceTest, TestKeylogFunctionality) { + // Test that OnNewSslCtx sets up keylog callback correctly + bssl::UniquePtr ssl_ctx(SSL_CTX_new(TLS_method())); + ASSERT_NE(ssl_ctx, nullptr); + + // Call OnNewSslCtx which should set up the keylog callback + proof_source_.OnNewSslCtx(ssl_ctx.get()); + + // Verify that the proof source was stored in SSL_CTX app data + void* app_data = SSL_CTX_get_app_data(ssl_ctx.get()); + EXPECT_EQ(app_data, static_cast(&proof_source_)); + + // Verify that keylog callback was set + void (*callback)(const SSL*, const char*) = SSL_CTX_get_keylog_callback(ssl_ctx.get()); + EXPECT_NE(callback, nullptr); +} + +// Test keylog callback registration +TEST_F(EnvoyQuicProofSourceTest, TestKeylogCallbackRegistration) { + // Create SSL_CTX and setup keylog + bssl::UniquePtr ssl_ctx(SSL_CTX_new(TLS_method())); + proof_source_.OnNewSslCtx(ssl_ctx.get()); + + // Verify that keylog callback is registered + void (*callback)(const SSL*, const char*) = SSL_CTX_get_keylog_callback(ssl_ctx.get()); + EXPECT_NE(callback, nullptr); + + // Verify that app data points to our proof source + void* app_data = SSL_CTX_get_app_data(ssl_ctx.get()); + EXPECT_EQ(app_data, static_cast(&proof_source_)); +} + +// Test keylog file writing with environment variable +TEST_F(EnvoyQuicProofSourceTest, TestKeylogFileWriting) { + // Create a temporary file for keylog output + std::string temp_file = "/tmp/test_keylog_" + std::to_string(getpid()) + ".txt"; + + // Set SSLKEYLOGFILE environment variable + setenv("SSLKEYLOGFILE", temp_file.c_str(), 1); + + // Create SSL_CTX and setup keylog + bssl::UniquePtr ssl_ctx(SSL_CTX_new(TLS_method())); + proof_source_.OnNewSslCtx(ssl_ctx.get()); + + // Create SSL connection + bssl::UniquePtr ssl(SSL_new(ssl_ctx.get())); + + // Get the keylog callback and call it to test functionality + void (*callback)(const SSL*, const char*) = SSL_CTX_get_keylog_callback(ssl_ctx.get()); + ASSERT_NE(callback, nullptr); + + // Call the callback with test data + const char* test_line = "CLIENT_RANDOM 0123456789abcdef test_key_material"; + callback(ssl.get(), test_line); + + // Verify the keylog was written to file + std::ifstream keylog_file(temp_file); + ASSERT_TRUE(keylog_file.is_open()); + std::string line; + ASSERT_TRUE(std::getline(keylog_file, line)); + EXPECT_EQ(line, test_line); + keylog_file.close(); + + // Clean up + unlink(temp_file.c_str()); + unsetenv("SSLKEYLOGFILE"); +} + +// Test keylog callback without environment variable +TEST_F(EnvoyQuicProofSourceTest, TestKeylogCallbackWithoutEnvironmentVariable) { + // Ensure SSLKEYLOGFILE is not set + unsetenv("SSLKEYLOGFILE"); + + // Create SSL_CTX and setup keylog + bssl::UniquePtr ssl_ctx(SSL_CTX_new(TLS_method())); + proof_source_.OnNewSslCtx(ssl_ctx.get()); + + // Verify that keylog callback is still registered (even without env var) + void (*callback)(const SSL*, const char*) = SSL_CTX_get_keylog_callback(ssl_ctx.get()); + EXPECT_NE(callback, nullptr); + + // Create SSL connection and test that callback doesn't crash without env var + bssl::UniquePtr ssl(SSL_new(ssl_ctx.get())); + + // Call the callback - it should not crash even without SSLKEYLOGFILE set + const char* test_line = "CLIENT_RANDOM 0123456789abcdef test_key_material"; + EXPECT_NO_THROW(callback(ssl.get(), test_line)); +} + +// Test QUIC keylog bridge functionality +TEST_F(EnvoyQuicProofSourceTest, TestQuicKeylogBridge) { + // Create a mock context config with keylog configuration + NiceMock mock_config; + NiceMock mock_access_log_manager; + auto mock_access_log_file = std::make_shared>(); + + std::string keylog_path = "/tmp/test_bridge_keylog_" + std::to_string(getpid()) + ".txt"; + + // Setup mock expectations + EXPECT_CALL(mock_config, tlsKeyLogPath()) + .WillRepeatedly(ReturnRef(keylog_path)); + + Network::Address::IpList empty_ip_list; + EXPECT_CALL(mock_config, tlsKeyLogLocal()) + .WillRepeatedly(ReturnRef(empty_ip_list)); + EXPECT_CALL(mock_config, tlsKeyLogRemote()) + .WillRepeatedly(ReturnRef(empty_ip_list)); + + EXPECT_CALL(mock_config, accessLogManager()) + .WillRepeatedly(ReturnRef(mock_access_log_manager)); + + EXPECT_CALL(mock_access_log_manager, createAccessLog(_)) + .WillOnce(Return(absl::StatusOr(mock_access_log_file))); + + EXPECT_CALL(*mock_access_log_file, write(_)) + .Times(1); + + // Create test addresses + auto local_addr = Network::Test::getCanonicalLoopbackAddress(Network::Address::IpVersion::v4); + auto remote_addr = Network::Test::getCanonicalLoopbackAddress(Network::Address::IpVersion::v4); + + // Test the bridge functionality + const char* test_line = "CLIENT_RANDOM 123456789 ABCDEF"; + EnvoyQuicProofSource::QuicKeylogBridge::writeKeylog(mock_config, *local_addr, *remote_addr, test_line); +} + +// Test the complete keylog callback flow including SSL context setup +TEST_F(EnvoyQuicProofSourceTest, TestKeylogCallbackWithSslContext) { + // Create an SSL context to test the callback registration + bssl::UniquePtr ssl_ctx(SSL_CTX_new(TLS_method())); + ASSERT_NE(ssl_ctx, nullptr); + + // Use OnNewSslCtx which calls setupQuicKeylogCallback internally + proof_source_.OnNewSslCtx(ssl_ctx.get()); + + // Create an SSL connection + bssl::UniquePtr ssl(SSL_new(ssl_ctx.get())); + ASSERT_NE(ssl, nullptr); + + // Verify that the keylog callback is set + auto callback = SSL_CTX_get_keylog_callback(ssl_ctx.get()); + EXPECT_NE(callback, nullptr); + + // Verify that the proof source is stored as app data + auto stored_proof_source = SSL_CTX_get_app_data(ssl_ctx.get()); + EXPECT_EQ(stored_proof_source, &proof_source_); + + // Test calling the callback - it should handle the case where transport socket callbacks are not available + const char* test_line = "CLIENT_RANDOM 0123456789abcdef test_key_material"; + + // Set up environment variable for fallback test + std::string keylog_path = "/tmp/test_callback_keylog_" + std::to_string(getpid()) + ".txt"; + setenv("SSLKEYLOGFILE", keylog_path.c_str(), 1); + + EXPECT_NO_THROW(callback(ssl.get(), test_line)); + + // Check that the keylog was written via environment variable fallback + std::ifstream keylog_file(keylog_path); + EXPECT_TRUE(keylog_file.good()); + if (keylog_file.good()) { + std::string line; + std::getline(keylog_file, line); + EXPECT_EQ(line, test_line); + } + + // Clean up + unsetenv("SSLKEYLOGFILE"); + unlink(keylog_path.c_str()); +} + } // namespace Quic } // namespace Envoy From 02969058b4490610279e1d6a46c55646945519f7 Mon Sep 17 00:00:00 2001 From: Chanhun Jeong Date: Wed, 1 Oct 2025 09:52:29 +0900 Subject: [PATCH 05/11] redis: fix race conditions in cluster destruction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Protect all async callbacks from accessing deallocated cluster members during destruction by adding is_destroying_ atomic flag checks. Affected callbacks: - ClusterRefreshManager callbacks - DNS resolution callbacks - Connection event callbacks - Timer callbacks - Redis client response callbacks (onResponse, onFailure, onUnexpectedResponse) - Hostname resolution callbacks The race condition occurred when callbacks were already queued in the event loop when cluster destruction began, causing use-after-free access to parent cluster members like info_, redis_discovery_session_, and resolve_timer_. All callbacks now check is_destroying_ with memory_order_acquire before accessing any parent members, ensuring safe termination during destruction. Fixes segfaults that occurred when removing Redis service entries. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../clusters/redis/redis_cluster.cc | 60 +++++++++++++++++-- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index acb3f3fdb8d31..abe7412923715 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -86,11 +86,17 @@ RedisCluster::RedisCluster( } // Register the cluster callback using weak_ptr to avoid use-after-free + // Also capture a pointer to is_destroying_ to check destruction state std::weak_ptr weak_session = redis_discovery_session_; + std::atomic* is_destroying_ptr = &is_destroying_; registration_handle_ = refresh_manager_->registerCluster( cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_, failure_refresh_threshold_, host_degraded_refresh_threshold_, - [weak_session]() { + [weak_session, is_destroying_ptr]() { + // Check if cluster is being destroyed first + if (is_destroying_ptr->load(std::memory_order_acquire)) { + return; + } // Try to lock the weak pointer to ensure the session is still alive auto session = weak_session.lock(); if (session && session->resolve_timer_) { @@ -101,7 +107,8 @@ RedisCluster::RedisCluster( RedisCluster::~RedisCluster() { // Set flag to prevent any callbacks from executing during destruction - is_destroying_.store(true); + // Use memory_order_release to ensure this write is visible to callbacks + is_destroying_.store(true, std::memory_order_release); // Reset redis_discovery_session_ before other members are destroyed // to ensure any pending callbacks from refresh_manager_ don't access it. @@ -111,6 +118,11 @@ RedisCluster::~RedisCluster() { // Also clear DNS discovery targets to prevent their callbacks from // accessing the destroyed cluster. dns_discovery_resolve_targets_.clear(); + + // Reset the registration handle LAST to ensure no new callbacks are scheduled + // while we're cleaning up. Any callbacks already scheduled will check is_destroying_ + // and return early. + registration_handle_.reset(); } void RedisCluster::startPreInit() { @@ -258,6 +270,10 @@ void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { parent_.onPreInitComplete(); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } else { + // Check if the parent cluster is being destroyed + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } // Once the DNS resolve the initial set of addresses, call startResolveRedis on // the RedisDiscoverySession. The RedisDiscoverySession will using the "cluster // slots" command for service discovery and slot allocation. All subsequent @@ -276,7 +292,7 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( : parent_(parent), dispatcher_(parent.dispatcher_), resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { // Check if the parent cluster is being destroyed - if (parent_.is_destroying_.load()) { + if (parent_.is_destroying_.load(std::memory_order_acquire)) { return; } startResolveRedis(); @@ -313,6 +329,10 @@ RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() { void RedisCluster::RedisDiscoveryClient::onEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { + // Check if the parent cluster is being destroyed + if (parent_.parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } auto client_to_delete = parent_.client_map_.find(host_); ASSERT(client_to_delete != parent_.client_map_.end()); parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->client_)); @@ -333,6 +353,11 @@ void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress( } void RedisCluster::RedisDiscoverySession::startResolveRedis() { + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + parent_.info_->configUpdateStats().update_attempt_.inc(); // If a resolution is currently in progress, skip it. if (current_request_) { @@ -407,6 +432,11 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( [this, slot_idx, slots, hostname_resolution_required_cnt]( Network::DnsResolver::ResolutionStatus status, absl::string_view, std::list&& response) -> void { + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + auto& slot = (*slots)[slot_idx]; ENVOY_LOG( debug, @@ -470,6 +500,11 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( [this, index, slots, replica_idx, hostname_resolution_required_cnt]( Network::DnsResolver::ResolutionStatus status, absl::string_view, std::list&& response) -> void { + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + auto& slot = (*slots)[index]; auto& replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(debug, "async DNS resolution complete for replica address {}", replica.first); @@ -504,6 +539,12 @@ void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( void RedisCluster::RedisDiscoverySession::onResponse( NetworkFilters::Common::Redis::RespValuePtr&& value) { + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + current_request_ = nullptr; + return; + } + ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", parent_.info_->name()); current_request_ = nullptr; @@ -630,14 +671,25 @@ bool RedisCluster::RedisDiscoverySession::validateCluster( void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( const NetworkFilters::Common::Redis::RespValuePtr& value) { + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString()); this->parent_.info_->configUpdateStats().update_failure_.inc(); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } void RedisCluster::RedisDiscoverySession::onFailure() { - ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name()); current_request_ = nullptr; + + // Check if the parent cluster is being destroyed before accessing any parent members + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + + ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name()); if (!current_host_address_.empty()) { auto client_to_delete = client_map_.find(current_host_address_); client_to_delete->second->client_->close(); From 79ddf7297242e0af6cba80a52f0b56bb6ea436bf Mon Sep 17 00:00:00 2001 From: Chanhun Jeong Date: Wed, 1 Oct 2025 11:40:05 +0900 Subject: [PATCH 06/11] redis: Add comprehensive null checks to prevent segfaults during cluster destruction Problem: Segmentation faults occur when accessing member pointers in async callbacks during Redis cluster destruction, even with is_destroying_ flag checks. This happens because there's a race window between checking the flag and accessing the pointers. Solution: Add defensive null checks for all pointer accesses that could become invalid during destruction: 1. ClusterInfo pointer (info_): - Add null checks before all configUpdateStats() calls - Use safe access pattern for name() in log statements - Locations: startResolveRedis(), updateDnsStats(), DNS callbacks, onResponse(), onUnexpectedResponse(), onFailure() 2. DNS Resolver pointer (dns_resolver_): - Add null checks in startResolveDns() - Add checks in resolveClusterHostnames() and resolveReplicas() - Prevents crashes when DNS resolution is initiated during teardown 3. Timer pointer (resolve_timer_): - Add null checks before enableTimer() calls - Locations: finishClusterHostnameResolution(), onResponse(), onUnexpectedResponse(), onFailure() 4. Consistency fix: - Line 714: Changed parent_.info() to parent_.info_ to match null-checked pattern used elsewhere The pattern applied throughout: 1. Check is_destroying_ flag with memory_order_acquire 2. Verify each pointer is non-null before dereferencing 3. This dual-check handles the race window safely This prevents use-after-free crashes during Redis cluster teardown when async callbacks execute after partial destruction has begun. --- .../clusters/redis/redis_cluster.cc | 72 +++++++++++++++---- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index abe7412923715..9fddc90aa3d94 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -242,6 +242,11 @@ RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() { void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_); + // Check if the parent cluster is being destroyed or dns_resolver is null + if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { + return; + } + active_query_ = parent_.dns_resolver_->resolve( dns_address_, parent_.dns_lookup_family_, [this](Network::DnsResolver::ResolutionStatus status, absl::string_view, @@ -249,10 +254,12 @@ void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { active_query_ = nullptr; ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_); if (status == Network::DnsResolver::ResolutionStatus::Failure || response.empty()) { - if (status == Network::DnsResolver::ResolutionStatus::Failure) { - parent_.info_->configUpdateStats().update_failure_.inc(); - } else { - parent_.info_->configUpdateStats().update_empty_.inc(); + if (parent_.info_) { + if (status == Network::DnsResolver::ResolutionStatus::Failure) { + parent_.info_->configUpdateStats().update_failure_.inc(); + } else { + parent_.info_->configUpdateStats().update_empty_.inc(); + } } if (!resolve_timer_) { @@ -358,11 +365,16 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { return; } + // Also check if info_ is still valid + if (!parent_.info_) { + return; + } + parent_.info_->configUpdateStats().update_attempt_.inc(); // If a resolution is currently in progress, skip it. if (current_request_) { ENVOY_LOG(debug, "redis cluster slot request is already in progress for '{}'", - parent_.info_->name()); + parent_.info_ ? parent_.info_->name() : "unknown"); return; } @@ -391,12 +403,16 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { parent_.auth_username_, parent_.auth_password_, false, absl::nullopt, absl::nullopt); client->client_->addConnectionCallbacks(*client); } - ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", parent_.info_->name()); + ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", + parent_.info_ ? parent_.info_->name() : "unknown"); current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); } void RedisCluster::RedisDiscoverySession::updateDnsStats( Network::DnsResolver::ResolutionStatus status, bool empty_response) { + if (!parent_.info_) { + return; + } if (status == Network::DnsResolver::ResolutionStatus::Failure) { parent_.info_->configUpdateStats().update_failure_.inc(); } else if (empty_response) { @@ -421,6 +437,11 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( ClusterSlotsSharedPtr&& slots, std::shared_ptr hostname_resolution_required_cnt) { + // Check if the parent cluster is being destroyed or dns_resolver is null + if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { + return; + } + for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) { auto& slot = (*slots)[slot_idx]; if (slot.primary() == nullptr) { @@ -484,6 +505,11 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( void RedisCluster::RedisDiscoverySession::resolveReplicas( ClusterSlotsSharedPtr slots, std::size_t index, std::shared_ptr hostname_resolution_required_cnt) { + // Check if the parent cluster is being destroyed or dns_resolver is null + if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { + return; + } + auto& slot = (*slots)[index]; if (slot.replicas_to_resolve_.empty()) { if (*hostname_resolution_required_cnt == 0) { @@ -533,8 +559,14 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( ClusterSlotsSharedPtr slots) { + // Check if the parent cluster is being destroyed + if (parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } parent_.onClusterSlotUpdate(std::move(slots)); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (resolve_timer_) { + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } } void RedisCluster::RedisDiscoverySession::onResponse( @@ -545,7 +577,8 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", parent_.info_->name()); + ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", + parent_.info_ ? parent_.info_->name() : "unknown"); current_request_ = nullptr; const uint32_t SlotRangeStart = 0; @@ -641,7 +674,9 @@ void RedisCluster::RedisDiscoverySession::onResponse( } else { // All slots addresses were represented by IP/Port pairs. parent_.onClusterSlotUpdate(std::move(cluster_slots)); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (resolve_timer_) { + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } } } @@ -677,8 +712,12 @@ void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( } ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString()); - this->parent_.info_->configUpdateStats().update_failure_.inc(); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (this->parent_.info_) { + this->parent_.info_->configUpdateStats().update_failure_.inc(); + } + if (resolve_timer_) { + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } } void RedisCluster::RedisDiscoverySession::onFailure() { @@ -689,13 +728,18 @@ void RedisCluster::RedisDiscoverySession::onFailure() { return; } - ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name()); + ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", + parent_.info_ ? parent_.info_->name() : "unknown"); if (!current_host_address_.empty()) { auto client_to_delete = client_map_.find(current_host_address_); client_to_delete->second->client_->close(); } - parent_.info()->configUpdateStats().update_failure_.inc(); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (parent_.info_) { + parent_.info_->configUpdateStats().update_failure_.inc(); + } + if (resolve_timer_) { + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } } RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_; From 2181386f5efc7174164edd1a6ca27d1410e54210 Mon Sep 17 00:00:00 2001 From: Chanhun Jeong Date: Wed, 1 Oct 2025 20:53:59 +0900 Subject: [PATCH 07/11] redis: Use local shared_ptr copies to prevent race conditions The previous fix with null checks still had a race condition window between checking the pointer and using it. Even with the null check, the shared_ptr could be reset to null by another thread between the check and use. Solution: Make local copies of shared_ptr before use. This ensures the pointer remains valid throughout its usage in the current scope. Changes: 1. startResolveRedis(): Copy info_ to local variable before use 2. updateDnsStats(): Use local copy of info_ 3. DNS callbacks: Use local copy for stats updates 4. onResponse(), onUnexpectedResponse(), onFailure(): Use local copies 5. client_factory_.create(): Check and use local copy of info_ The pattern applied: auto info = parent_.info_; // Make local copy (ref count++) if (!info) { // Check if null return; } info->method(); // Safe to use - won't become null This prevents the crash at line 376 where info_ was becoming null between the check and the access, even with memory_order_acquire. --- .../clusters/redis/redis_cluster.cc | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 9fddc90aa3d94..0c84a28d92fcd 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -254,11 +254,12 @@ void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { active_query_ = nullptr; ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_); if (status == Network::DnsResolver::ResolutionStatus::Failure || response.empty()) { - if (parent_.info_) { + auto info = parent_.info_; + if (info) { if (status == Network::DnsResolver::ResolutionStatus::Failure) { - parent_.info_->configUpdateStats().update_failure_.inc(); + info->configUpdateStats().update_failure_.inc(); } else { - parent_.info_->configUpdateStats().update_empty_.inc(); + info->configUpdateStats().update_empty_.inc(); } } @@ -365,16 +366,17 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { return; } - // Also check if info_ is still valid - if (!parent_.info_) { + // Make a local copy of the shared_ptr to prevent it from becoming null between check and use + auto info = parent_.info_; + if (!info) { return; } - parent_.info_->configUpdateStats().update_attempt_.inc(); + info->configUpdateStats().update_attempt_.inc(); // If a resolution is currently in progress, skip it. if (current_request_) { ENVOY_LOG(debug, "redis cluster slot request is already in progress for '{}'", - parent_.info_ ? parent_.info_->name() : "unknown"); + info ? info->name() : "unknown"); return; } @@ -396,27 +398,32 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { if (!client) { client = std::make_unique(*this); client->host_ = current_host_address_; + auto parent_info = parent_.info_; + if (!parent_info) { + return; + } // absl::nullopt here disables AWS IAM authentication in redis client which is not supported by // redis cluster implementation client->client_ = client_factory_.create( - host, dispatcher_, shared_from_this(), redis_command_stats_, parent_.info()->statsScope(), + host, dispatcher_, shared_from_this(), redis_command_stats_, parent_info->statsScope(), parent_.auth_username_, parent_.auth_password_, false, absl::nullopt, absl::nullopt); client->client_->addConnectionCallbacks(*client); } ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", - parent_.info_ ? parent_.info_->name() : "unknown"); + info ? info->name() : "unknown"); current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); } void RedisCluster::RedisDiscoverySession::updateDnsStats( Network::DnsResolver::ResolutionStatus status, bool empty_response) { - if (!parent_.info_) { + auto info = parent_.info_; + if (!info) { return; } if (status == Network::DnsResolver::ResolutionStatus::Failure) { - parent_.info_->configUpdateStats().update_failure_.inc(); + info->configUpdateStats().update_failure_.inc(); } else if (empty_response) { - parent_.info_->configUpdateStats().update_empty_.inc(); + info->configUpdateStats().update_empty_.inc(); } } @@ -577,8 +584,9 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } + auto info = parent_.info_; ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", - parent_.info_ ? parent_.info_->name() : "unknown"); + info ? info->name() : "unknown"); current_request_ = nullptr; const uint32_t SlotRangeStart = 0; @@ -712,8 +720,9 @@ void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( } ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString()); - if (this->parent_.info_) { - this->parent_.info_->configUpdateStats().update_failure_.inc(); + auto info = this->parent_.info_; + if (info) { + info->configUpdateStats().update_failure_.inc(); } if (resolve_timer_) { resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); @@ -728,14 +737,15 @@ void RedisCluster::RedisDiscoverySession::onFailure() { return; } + auto info = parent_.info_; ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", - parent_.info_ ? parent_.info_->name() : "unknown"); + info ? info->name() : "unknown"); if (!current_host_address_.empty()) { auto client_to_delete = client_map_.find(current_host_address_); client_to_delete->second->client_->close(); } - if (parent_.info_) { - parent_.info_->configUpdateStats().update_failure_.inc(); + if (info) { + info->configUpdateStats().update_failure_.inc(); } if (resolve_timer_) { resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); From aa44067b4f5f0470257b21da5b88e75ead518acc Mon Sep 17 00:00:00 2001 From: Chanhun Jeong Date: Thu, 2 Oct 2025 10:47:08 +0900 Subject: [PATCH 08/11] redis: Use shared_from_this() to keep RedisDiscoverySession alive during timer callbacks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 5% crash rate was caused by timer callbacks executing after the RedisDiscoverySession was destroyed. Even though we checked is_destroying_, there was a race where: 1. Timer callback fires and enters the lambda 2. Destructor runs and deletes the session (unique_ptr reset) 3. Callback tries to access parent_.is_destroying_ → CRASH (use-after-free) Solution: - Move timer creation from constructor to initialize() method - Capture shared_from_this() in timer lambda instead of raw 'this' - Call initialize() after RedisDiscoverySession construction completes This ensures the session object stays alive as long as any timer callback is queued or executing, preventing the use-after-free. Pattern changed from: resolve_timer_ = dispatcher_.createTimer([this]() { ... }); To: auto self = shared_from_this(); resolve_timer_ = dispatcher_.createTimer([self]() { ... }); This should eliminate the remaining 5% crash rate during Redis cluster destruction. --- .../clusters/redis/redis_cluster.cc | 23 +++++++++++++------ .../extensions/clusters/redis/redis_cluster.h | 3 +++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 0c84a28d92fcd..055a18a392d01 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -103,6 +103,9 @@ RedisCluster::RedisCluster( session->resolve_timer_->enableTimer(std::chrono::milliseconds(0)); } }); + + // Initialize the session after construction is complete so it can use shared_from_this() + redis_discovery_session_->initialize(); } RedisCluster::~RedisCluster() { @@ -298,18 +301,24 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( Envoy::Extensions::Clusters::Redis::RedisCluster& parent, NetworkFilters::Common::Redis::Client::ClientFactory& client_factory) : parent_(parent), dispatcher_(parent.dispatcher_), - resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { - // Check if the parent cluster is being destroyed - if (parent_.is_destroying_.load(std::memory_order_acquire)) { - return; - } - startResolveRedis(); - })), + resolve_timer_(nullptr), client_factory_(client_factory), buffer_timeout_(0), redis_command_stats_( NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( parent_.info()->statsScope().symbolTable())) {} +void RedisCluster::RedisDiscoverySession::initialize() { + // Create timer with shared_from_this() to keep session alive during callbacks + auto self = shared_from_this(); + resolve_timer_ = dispatcher_.createTimer([self]() -> void { + // Check if the parent cluster is being destroyed + if (self->parent_.is_destroying_.load(std::memory_order_acquire)) { + return; + } + self->startResolveRedis(); + }); +} + // Convert the cluster slot IP/Port response to an address, return null if the response // does not match the expected type. Network::Address::InstanceConstSharedPtr diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 1b12f347ddf88..d49bf33b91471 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -221,6 +221,9 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { ~RedisDiscoverySession() override; + // Initialize timer - must be called after construction since it uses shared_from_this() + void initialize(); + void registerDiscoveryAddress(std::list&& response, const uint32_t port); // Start discovery against a random host from existing hosts From 65714e2d09c2059bdb92a8201f62de6cea0ab67c Mon Sep 17 00:00:00 2001 From: Chanhun Jeong Date: Thu, 2 Oct 2025 15:15:28 +0900 Subject: [PATCH 09/11] redis: Fix use-after-free by using session-owned flag instead of parent reference CRITICAL FIX: The previous approach had a fatal flaw - callbacks with shared_from_this() kept the session alive, but the session holds a reference to the parent RedisCluster. When the parent was destroyed, accessing parent_.is_destroying_ became use-after-free. The race condition: 1. Timer callback fires with shared_ptr (session kept alive) 2. RedisCluster destructor runs and completes 3. Callback tries to check parent_.is_destroying_ 4. CRASH - parent object destroyed, reference is dangling Solution: - Add parent_destroyed_ atomic flag IN THE SESSION - Parent sets this flag BEFORE destroying session - Callbacks check session-owned flag, never access parent directly - Also simplify all safety checks into helper methods This is the correct fix for the 5% crash rate when removing Redis services. --- .../clusters/redis/redis_cluster.cc | 63 ++++++++----------- .../extensions/clusters/redis/redis_cluster.h | 27 ++++++++ 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 055a18a392d01..982a73f2ac200 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -113,6 +113,13 @@ RedisCluster::~RedisCluster() { // Use memory_order_release to ensure this write is visible to callbacks is_destroying_.store(true, std::memory_order_release); + // CRITICAL: Set the session's parent_destroyed_ flag BEFORE resetting the session. + // This allows callbacks with shared_from_this() to safely check if parent is destroyed + // without accessing the parent object itself (which may be destroyed). + if (redis_discovery_session_) { + redis_discovery_session_->parent_destroyed_.store(true, std::memory_order_release); + } + // Reset redis_discovery_session_ before other members are destroyed // to ensure any pending callbacks from refresh_manager_ don't access it. // This matches the approach in PR #39625. @@ -245,7 +252,6 @@ RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() { void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() { ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_); - // Check if the parent cluster is being destroyed or dns_resolver is null if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { return; } @@ -311,8 +317,7 @@ void RedisCluster::RedisDiscoverySession::initialize() { // Create timer with shared_from_this() to keep session alive during callbacks auto self = shared_from_this(); resolve_timer_ = dispatcher_.createTimer([self]() -> void { - // Check if the parent cluster is being destroyed - if (self->parent_.is_destroying_.load(std::memory_order_acquire)) { + if (!self->isParentAlive()) { return; } self->startResolveRedis(); @@ -370,13 +375,8 @@ void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress( } void RedisCluster::RedisDiscoverySession::startResolveRedis() { - // Check if the parent cluster is being destroyed before accessing any parent members - if (parent_.is_destroying_.load(std::memory_order_acquire)) { - return; - } - - // Make a local copy of the shared_ptr to prevent it from becoming null between check and use - auto info = parent_.info_; + // Use helper to safely get parent info (returns nullptr if parent is being destroyed) + auto info = parentInfo(); if (!info) { return; } @@ -407,7 +407,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { if (!client) { client = std::make_unique(*this); client->host_ = current_host_address_; - auto parent_info = parent_.info_; + // Get parent info again in case parent was destroyed between checks + auto parent_info = parentInfo(); if (!parent_info) { return; } @@ -425,7 +426,7 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { void RedisCluster::RedisDiscoverySession::updateDnsStats( Network::DnsResolver::ResolutionStatus status, bool empty_response) { - auto info = parent_.info_; + auto info = parentInfo(); if (!info) { return; } @@ -453,8 +454,7 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( ClusterSlotsSharedPtr&& slots, std::shared_ptr hostname_resolution_required_cnt) { - // Check if the parent cluster is being destroyed or dns_resolver is null - if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { + if (!isParentAlive() || !parent_.dns_resolver_) { return; } @@ -469,8 +469,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( [this, slot_idx, slots, hostname_resolution_required_cnt]( Network::DnsResolver::ResolutionStatus status, absl::string_view, std::list&& response) -> void { - // Check if the parent cluster is being destroyed before accessing any parent members - if (parent_.is_destroying_.load(std::memory_order_acquire)) { + if (!isParentAlive()) { return; } @@ -521,8 +520,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( void RedisCluster::RedisDiscoverySession::resolveReplicas( ClusterSlotsSharedPtr slots, std::size_t index, std::shared_ptr hostname_resolution_required_cnt) { - // Check if the parent cluster is being destroyed or dns_resolver is null - if (parent_.is_destroying_.load(std::memory_order_acquire) || !parent_.dns_resolver_) { + if (!isParentAlive() || !parent_.dns_resolver_) { return; } @@ -575,8 +573,7 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( ClusterSlotsSharedPtr slots) { - // Check if the parent cluster is being destroyed - if (parent_.is_destroying_.load(std::memory_order_acquire)) { + if (!isParentAlive()) { return; } parent_.onClusterSlotUpdate(std::move(slots)); @@ -587,13 +584,11 @@ void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( void RedisCluster::RedisDiscoverySession::onResponse( NetworkFilters::Common::Redis::RespValuePtr&& value) { - // Check if the parent cluster is being destroyed before accessing any parent members - if (parent_.is_destroying_.load(std::memory_order_acquire)) { + auto info = parentInfo(); + if (!info) { current_request_ = nullptr; return; } - - auto info = parent_.info_; ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", info ? info->name() : "unknown"); current_request_ = nullptr; @@ -724,15 +719,13 @@ bool RedisCluster::RedisDiscoverySession::validateCluster( void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( const NetworkFilters::Common::Redis::RespValuePtr& value) { // Check if the parent cluster is being destroyed before accessing any parent members - if (parent_.is_destroying_.load(std::memory_order_acquire)) { + auto info = parentInfo(); + if (!info) { return; } ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString()); - auto info = this->parent_.info_; - if (info) { - info->configUpdateStats().update_failure_.inc(); - } + info->configUpdateStats().update_failure_.inc(); if (resolve_timer_) { resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } @@ -741,21 +734,17 @@ void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( void RedisCluster::RedisDiscoverySession::onFailure() { current_request_ = nullptr; - // Check if the parent cluster is being destroyed before accessing any parent members - if (parent_.is_destroying_.load(std::memory_order_acquire)) { + auto info = parentInfo(); + if (!info) { return; } - auto info = parent_.info_; - ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", - info ? info->name() : "unknown"); + ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", info->name()); if (!current_host_address_.empty()) { auto client_to_delete = client_map_.find(current_host_address_); client_to_delete->second->client_->close(); } - if (info) { - info->configUpdateStats().update_failure_.inc(); - } + info->configUpdateStats().update_failure_.inc(); if (resolve_timer_) { resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index d49bf33b91471..0a4b8fe8d29e9 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -269,6 +269,28 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); + private: + friend class RedisCluster; + friend struct RedisCluster::DnsDiscoveryResolveTarget; + friend struct RedisDiscoveryClient; + // Thread-safe check if parent cluster is being destroyed. + // Returns true if it's safe to proceed with parent operations. + // NOTE: We check our own flag instead of parent_.is_destroying_ because + // parent_ is a reference that becomes dangling after parent is destroyed. + bool isParentAlive() const { + return !parent_destroyed_.load(std::memory_order_acquire); + } + + // Thread-safe accessor for parent cluster info. + // Returns nullptr if parent is being destroyed or info is not available. + // This encapsulates the safety checks needed when accessing parent state from callbacks. + Upstream::ClusterInfoConstSharedPtr parentInfo() const { + if (!isParentAlive()) { + return nullptr; + } + return parent_.info_; + } + RedisCluster& parent_; Event::Dispatcher& dispatcher_; std::string current_host_address_; @@ -281,6 +303,11 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_; const std::chrono::milliseconds buffer_timeout_; NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; + + // Flag set by parent's destructor to signal that parent is being destroyed. + // Callbacks check this flag (owned by session) instead of accessing parent's flag + // to avoid use-after-free when parent is destroyed but callbacks are still queued. + std::atomic parent_destroyed_{false}; }; Upstream::ClusterManager& cluster_manager_; From 307a560bb31eba27e19c0a3326d3ac49f7e55823 Mon Sep 17 00:00:00 2001 From: Jack Park Date: Thu, 6 Nov 2025 16:13:03 -0800 Subject: [PATCH 10/11] Add pubsub commands --- .../intro/arch_overview/other_protocols/redis.rst | 4 ++++ .../network/common/redis/supported_commands.h | 15 ++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/docs/root/intro/arch_overview/other_protocols/redis.rst b/docs/root/intro/arch_overview/other_protocols/redis.rst index 04cdd322f5b8c..d96f3dd593d0a 100644 --- a/docs/root/intro/arch_overview/other_protocols/redis.rst +++ b/docs/root/intro/arch_overview/other_protocols/redis.rst @@ -215,10 +215,14 @@ For details on each command's usage see the official SISMEMBER, Set SMEMBERS, Set SPOP, Set + SPUBLISH Pubsub SRANDMEMBER, Set SREM, Set SCAN, Generic SSCAN, Set + SSUBSCRIBE Pubsub + SUBSCRIBE, Pubsub + SUNSUBSCRIBE Pubsub WATCH, String UNWATCH, String ZADD, Sorted Set diff --git a/source/extensions/filters/network/common/redis/supported_commands.h b/source/extensions/filters/network/common/redis/supported_commands.h index b8d9d3d6e2904..f6241751da8ea 100644 --- a/source/extensions/filters/network/common/redis/supported_commands.h +++ b/source/extensions/filters/network/common/redis/supported_commands.h @@ -30,11 +30,12 @@ struct SupportedCommands { "lpush", "lpushx", "lrange", "lrem", "lset", "ltrim", "persist", "pexpire", "pexpireat", "pfadd", "pfcount", "psetex", "pttl", "publish", "restore", "rpop", "rpush", "rpushx", "sadd", "scard", "set", "setbit", "setex", "setnx", "setrange", "sismember", "smembers", - "spop", "srandmember", "srem", "sscan", "strlen", "ttl", "type", "xack", "xadd", - "xautoclaim", "xclaim", "xdel", "xlen", "xpending", "xrange", "xrevrange", "xtrim", "zadd", - "zcard", "zcount", "zincrby", "zlexcount", "zpopmin", "zpopmax", "zrange", "zrangebylex", - "zrangebyscore", "zrank", "zrem", "zremrangebylex", "zremrangebyrank", "zremrangebyscore", - "zrevrange", "zrevrangebylex", "zrevrangebyscore", "zrevrank", "zscan", "zscore"); + "spop", "spublish", "srandmember", "srem", "sscan", "strlen", "ssubscribe", "subscribe", + "sunsubscribe", "ttl", "type", "watch", "xack", "xadd", "xautoclaim", "xclaim", "xdel", + "xlen", "xpending", "xrange", "xrevrange", "xtrim", "zadd", "zcard", "zcount", "zincrby", + "zlexcount", "zpopmin", "zpopmax", "zrange", "zrangebylex", "zrangebyscore", "zrank", + "zrem", "zremrangebylex", "zremrangebyrank", "zremrangebyscore", "zrevrange", + "zrevrangebylex", "zrevrangebyscore", "zrevrank", "zscan", "zscore"); } /** @@ -139,8 +140,8 @@ struct SupportedCommands { "lpush", "lpushx", "lrem", "lset", "ltrim", "mset", "multi", "persist", "pexpire", "pexpireat", "pfadd", "psetex", "restore", "rpop", "rpush", "rpushx", "sadd", "set", "setbit", "setex", "setnx", "setrange", "spop", - "srem", "zadd", "zincrby", "touch", "zpopmin", "zpopmax", "zrem", - "zremrangebylex", "zremrangebyrank", "zremrangebyscore", "unlink"); + "srem", "spublish", "zadd", "zincrby", "touch", "zpopmin", "zpopmax", + "zrem", "zremrangebylex", "zremrangebyrank", "zremrangebyscore", "unlink"); } static bool isReadCommand(const std::string& command) { From 5515a8eb63525beae5131496548ed42a2edf7694 Mon Sep 17 00:00:00 2001 From: ku524 Date: Mon, 5 Jan 2026 11:22:18 +0900 Subject: [PATCH 11/11] [CPLAT-8445] Change trace_id pattern (UUIDv4->UUIDv7) --- .../tracers/opentelemetry/tracer.cc | 33 ++++++++++-- .../extensions/tracers/opentelemetry/tracer.h | 13 +++++ .../opentelemetry_tracer_impl_test.cc | 53 +++++++++++-------- 3 files changed, 73 insertions(+), 26 deletions(-) diff --git a/source/extensions/tracers/opentelemetry/tracer.cc b/source/extensions/tracers/opentelemetry/tracer.cc index 6158ee66819e6..1f9750f1de2e7 100644 --- a/source/extensions/tracers/opentelemetry/tracer.cc +++ b/source/extensions/tracers/opentelemetry/tracer.cc @@ -5,6 +5,7 @@ #include "envoy/config/trace/v3/opentelemetry.pb.h" +#include "source/common/common/assert.h" #include "source/common/common/empty_string.h" #include "source/common/common/hex.h" #include "source/common/tracing/common_values.h" @@ -264,6 +265,34 @@ void Tracer::sendSpan(::opentelemetry::proto::trace::v1::Span& span) { } } +std::string Tracer::generateTraceIdV7() { + // UUIDv7 bit layout (RFC 9562): + // High 64 bits: [timestamp_ms (48)] [version (4)] [rand_a (12)] + // Low 64 bits: [variant (2)] [rand_b (62)] + + // Get current Unix timestamp in milliseconds. + const uint64_t timestamp_ms = static_cast( + std::chrono::duration_cast( + time_source_.systemTime().time_since_epoch()) + .count()); + // Timestamp must fit in 48 bits (valid until year ~10889). + ASSERT((timestamp_ms >> 48) == 0); + + // Generate random bits. + const uint64_t rand_a = random_.random() & 0x0FFFULL; // 12 bits + const uint64_t rand_b = random_.random() & 0x3FFFFFFFFFFFFFFFULL; // 62 bits + + // Assemble high 64 bits: [timestamp_ms(48)][version=7(4)][rand_a(12)] + constexpr uint64_t kVersion7 = 0x7ULL; + const uint64_t trace_id_high = (timestamp_ms << 16) | (kVersion7 << 12) | rand_a; + + // Assemble low 64 bits: [variant=2(2)][rand_b(62)] + constexpr uint64_t kVariantRfc4122 = 0x2ULL; + const uint64_t trace_id_low = (kVariantRfc4122 << 62) | rand_b; + + return absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id_low)); +} + Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name, const StreamInfo::StreamInfo& stream_info, SystemTime start_time, Tracing::Decision tracing_decision, @@ -271,9 +300,7 @@ Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name, OTelSpanKind span_kind) { // Create an Tracers::OpenTelemetry::Span class that will contain the OTel span. Span new_span(operation_name, stream_info, start_time, time_source_, *this, span_kind); - uint64_t trace_id_high = random_.random(); - uint64_t trace_id = random_.random(); - new_span.setTraceId(absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id))); + new_span.setTraceId(generateTraceIdV7()); uint64_t span_id = random_.random(); new_span.setId(Hex::uint64ToHex(span_id)); if (sampler_) { diff --git a/source/extensions/tracers/opentelemetry/tracer.h b/source/extensions/tracers/opentelemetry/tracer.h index 11ae574f9e32b..9b6135c385447 100644 --- a/source/extensions/tracers/opentelemetry/tracer.h +++ b/source/extensions/tracers/opentelemetry/tracer.h @@ -65,6 +65,19 @@ class Tracer : Logger::Loggable { * Removes all spans from the span buffer and sends them to the collector. */ void flushSpans(); + /** + * Generates a 128-bit trace ID following UUIDv7 format (RFC 9562). + * + * UUIDv7 layout (128 bits): + * [0-47] 48-bit Unix timestamp (milliseconds) + * [48-51] 4-bit version (0111 = 7) + * [52-63] 12-bit rand_a + * [64-65] 2-bit variant (10) + * [66-127] 62-bit rand_b + * + * @return 32-character hex string (128-bit trace ID). + */ + std::string generateTraceIdV7(); OpenTelemetryTraceExporterPtr exporter_; Envoy::TimeSource& time_source_; diff --git a/test/extensions/tracers/opentelemetry/opentelemetry_tracer_impl_test.cc b/test/extensions/tracers/opentelemetry/opentelemetry_tracer_impl_test.cc index f91cfd4f96932..e50c14280bb47 100644 --- a/test/extensions/tracers/opentelemetry/opentelemetry_tracer_impl_test.cc +++ b/test/extensions/tracers/opentelemetry/opentelemetry_tracer_impl_test.cc @@ -250,18 +250,20 @@ TEST_F(OpenTelemetryDriverTest, GenerateSpanContextWithoutHeadersTest) { Tracing::TestTraceContextImpl request_headers{ {":authority", "test.com"}, {":path", "/"}, {":method", "GET"}}; - // Mock the random call for generating trace and span IDs so we can check it later. - const uint64_t trace_id_high = 1; - const uint64_t trace_id_low = 2; + // Fix timestamp for deterministic UUIDv7 trace_id generation. + time_system_.setSystemTime(std::chrono::milliseconds(0)); + + // Mock the random calls for generating trace_id (UUIDv7: rand_a, rand_b) and span_id. + const uint64_t rand_a = 1; + const uint64_t rand_b = 2; const uint64_t new_span_id = 3; NiceMock& mock_random_generator_ = context_.server_factory_context_.api_.random_; - // The tracer should generate three random numbers for the trace high, trace low, and span id. { InSequence s; - EXPECT_CALL(mock_random_generator_, random()).WillOnce(Return(trace_id_high)); - EXPECT_CALL(mock_random_generator_, random()).WillOnce(Return(trace_id_low)); + EXPECT_CALL(mock_random_generator_, random()).WillOnce(Return(rand_a)); + EXPECT_CALL(mock_random_generator_, random()).WillOnce(Return(rand_b)); EXPECT_CALL(mock_random_generator_, random()).WillOnce(Return(new_span_id)); } @@ -276,8 +278,9 @@ TEST_F(OpenTelemetryDriverTest, GenerateSpanContextWithoutHeadersTest) { // Ends in 01 because span should be sampled. See // https://w3c.github.io/trace-context/#trace-flags. + // trace_id is UUIDv7: timestamp(0) + version(7) + rand_a(1) | variant(2) + rand_b(2) EXPECT_EQ(sampled_entry.has_value(), true); - EXPECT_EQ(sampled_entry.value(), "00-00000000000000010000000000000002-0000000000000003-01"); + EXPECT_EQ(sampled_entry.value(), "00-00000000000070018000000000000002-0000000000000003-01"); } // Verifies a span it not created when an invalid traceparent header is received @@ -604,6 +607,8 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPSpanWithAttributes) { setupValidDriver(); Tracing::TestTraceContextImpl request_headers{ {":authority", "test.com"}, {":path", "/"}, {":method", "GET"}}; + // Fix timestamp for deterministic UUIDv7 trace_id generation. + time_system_.setSystemTime(std::chrono::milliseconds(0)); NiceMock& mock_random_generator_ = context_.server_factory_context_.api_.random_; int64_t generated_int = 1; @@ -658,12 +663,11 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPSpanWithAttributes) { TestUtility::loadFromYaml(fmt::format(request_yaml, envoy_version, timestamp_ns, timestamp_ns), request_proto); - std::string generated_int_hex = Hex::uint64ToHex(generated_int); auto* expected_span = request_proto.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans(0); - expected_span->set_trace_id( - absl::HexStringToBytes(absl::StrCat(generated_int_hex, generated_int_hex))); - expected_span->set_span_id(absl::HexStringToBytes(absl::StrCat(generated_int_hex))); + // UUIDv7 trace_id: timestamp(0) + version(7) + rand_a(1) | variant(2) + rand_b(1) + expected_span->set_trace_id(absl::HexStringToBytes("00000000000070018000000000000001")); + expected_span->set_span_id(absl::HexStringToBytes(Hex::uint64ToHex(generated_int))); EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.opentelemetry.min_flush_spans", 5U)) .Times(1) @@ -680,6 +684,8 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPSpanWithAttributesAndStatus) { setupValidDriver(); Tracing::TestTraceContextImpl request_headers{ {":authority", "test.com"}, {":path", "/"}, {":method", "GET"}}; + // Fix timestamp for deterministic UUIDv7 trace_id generation. + time_system_.setSystemTime(std::chrono::milliseconds(0)); NiceMock& mock_random_generator_ = context_.server_factory_context_.api_.random_; int64_t generated_int = 1; @@ -740,12 +746,11 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPSpanWithAttributesAndStatus) { TestUtility::loadFromYaml(fmt::format(request_yaml, envoy_version, timestamp_ns, timestamp_ns), request_proto); - std::string generated_int_hex = Hex::uint64ToHex(generated_int); auto* expected_span = request_proto.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans(0); - expected_span->set_trace_id( - absl::HexStringToBytes(absl::StrCat(generated_int_hex, generated_int_hex))); - expected_span->set_span_id(absl::HexStringToBytes(absl::StrCat(generated_int_hex))); + // UUIDv7 trace_id: timestamp(0) + version(7) + rand_a(1) | variant(2) + rand_b(1) + expected_span->set_trace_id(absl::HexStringToBytes("00000000000070018000000000000001")); + expected_span->set_span_id(absl::HexStringToBytes(Hex::uint64ToHex(generated_int))); EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.opentelemetry.min_flush_spans", 5U)) .Times(1) @@ -762,6 +767,8 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPGRPCSpanWithAttributesAndStatus) { setupValidDriver(); Tracing::TestTraceContextImpl request_headers{ {":authority", "test.com"}, {":path", "/"}, {":method", "GET"}}; + // Fix timestamp for deterministic UUIDv7 trace_id generation. + time_system_.setSystemTime(std::chrono::milliseconds(0)); NiceMock& mock_random_generator_ = context_.server_factory_context_.api_.random_; int64_t generated_int = 1; @@ -830,12 +837,11 @@ TEST_F(OpenTelemetryDriverTest, ExportOTLPGRPCSpanWithAttributesAndStatus) { TestUtility::loadFromYaml(fmt::format(request_yaml, envoy_version, timestamp_ns, timestamp_ns), request_proto); - std::string generated_int_hex = Hex::uint64ToHex(generated_int); auto* expected_span = request_proto.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans(0); - expected_span->set_trace_id( - absl::HexStringToBytes(absl::StrCat(generated_int_hex, generated_int_hex))); - expected_span->set_span_id(absl::HexStringToBytes(absl::StrCat(generated_int_hex))); + // UUIDv7 trace_id: timestamp(0) + version(7) + rand_a(1) | variant(2) + rand_b(1) + expected_span->set_trace_id(absl::HexStringToBytes("00000000000070018000000000000001")); + expected_span->set_span_id(absl::HexStringToBytes(Hex::uint64ToHex(generated_int))); EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.opentelemetry.min_flush_spans", 5U)) .Times(1) @@ -903,6 +909,8 @@ TEST_F(OpenTelemetryDriverTest, ExportSpanWithCustomServiceName) { Tracing::TestTraceContextImpl request_headers{ {":authority", "test.com"}, {":path", "/"}, {":method", "GET"}}; + // Fix timestamp for deterministic UUIDv7 trace_id generation. + time_system_.setSystemTime(std::chrono::milliseconds(0)); NiceMock& mock_random_generator_ = context_.server_factory_context_.api_.random_; int64_t generated_int = 1; @@ -942,12 +950,11 @@ TEST_F(OpenTelemetryDriverTest, ExportSpanWithCustomServiceName) { TestUtility::loadFromYaml(fmt::format(request_yaml, envoy_version, timestamp_ns, timestamp_ns), request_proto); - std::string generated_int_hex = Hex::uint64ToHex(generated_int); auto* expected_span = request_proto.mutable_resource_spans(0)->mutable_scope_spans(0)->mutable_spans(0); - expected_span->set_trace_id( - absl::HexStringToBytes(absl::StrCat(generated_int_hex, generated_int_hex))); - expected_span->set_span_id(absl::HexStringToBytes(absl::StrCat(generated_int_hex))); + // UUIDv7 trace_id: timestamp(0) + version(7) + rand_a(1) | variant(2) + rand_b(1) + expected_span->set_trace_id(absl::HexStringToBytes("00000000000070018000000000000001")); + expected_span->set_span_id(absl::HexStringToBytes(Hex::uint64ToHex(generated_int))); EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.opentelemetry.min_flush_spans", 5U)) .Times(1)