diff --git a/BUILD b/BUILD index afec159fdba..bf7c847a416 100644 --- a/BUILD +++ b/BUILD @@ -36,6 +36,7 @@ ISTIO_EXTENSIONS = [ "//source/extensions/filters/http/istio_stats", "//source/extensions/filters/http/peer_metadata:filter_lib", "//source/extensions/filters/network/metadata_exchange:config_lib", + "//source/extensions/filters/network/peer_metadata", ] envoy_cc_binary( diff --git a/CODEOWNERS b/CODEOWNERS index cb66acd443e..37f2015ed7b 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1 +1 @@ -* @istio/wg-policies-and-telemetry-maintainers +* @istio/wg-policies-and-telemetry-maintainers diff --git a/extensions/common/BUILD b/extensions/common/BUILD index 2bc1771291d..5ac05f8a578 100644 --- a/extensions/common/BUILD +++ b/extensions/common/BUILD @@ -34,6 +34,7 @@ envoy_cc_library( "@com_google_absl//absl/strings", "@com_google_absl//absl/types:optional", "@envoy//envoy/common:hashable_interface", + "@envoy//envoy/local_info:local_info_interface", "@envoy//envoy/registry", "@envoy//envoy/stream_info:filter_state_interface", "@envoy//source/common/common:hash_lib", diff --git a/extensions/common/metadata_object.cc b/extensions/common/metadata_object.cc index e5f763b2332..74e1e09424a 100644 --- a/extensions/common/metadata_object.cc +++ b/extensions/common/metadata_object.cc @@ -14,6 +14,7 @@ #include "extensions/common/metadata_object.h" +#include "envoy/config/core/v3/base.pb.h" #include "envoy/registry/registry.h" #include "source/common/common/hash.h" #include "source/common/protobuf/utility.h" @@ -24,7 +25,10 @@ namespace Istio { namespace Common { namespace { -static absl::flat_hash_map ALL_BAGGAGE_TOKENS = { + +// This maps field names into baggage tokens. We use it to decode field names +// when WorkloadMetadataObject content is accessed through the Envoy API. +static absl::flat_hash_map ALL_METADATA_FIELDS = { {NamespaceNameToken, BaggageToken::NamespaceName}, {ClusterNameToken, BaggageToken::ClusterName}, {ServiceNameToken, BaggageToken::ServiceName}, @@ -34,6 +38,25 @@ static absl::flat_hash_map ALL_BAGGAGE_TOKENS = {WorkloadNameToken, BaggageToken::WorkloadName}, {WorkloadTypeToken, BaggageToken::WorkloadType}, {InstanceNameToken, BaggageToken::InstanceName}, + {RegionToken, BaggageToken::LocalityRegion}, + {ZoneToken, BaggageToken::LocalityZone}, +}; + +// This maps baggage keys into baggage tokens. We use it to decode baggage keys +// coming over the wire when building WorkloadMetadataObject. +static absl::flat_hash_map ALL_BAGGAGE_TOKENS = { + {NamespaceNameBaggageToken, BaggageToken::NamespaceName}, + {ClusterNameBaggageToken, BaggageToken::ClusterName}, + {ServiceNameBaggageToken, BaggageToken::ServiceName}, + {ServiceVersionBaggageToken, BaggageToken::ServiceVersion}, + {AppNameBaggageToken, BaggageToken::AppName}, + {AppVersionBaggageToken, BaggageToken::AppVersion}, + {DeploymentNameBaggageToken, BaggageToken::WorkloadName}, + {PodNameBaggageToken, BaggageToken::WorkloadName}, + {CronjobNameBaggageToken, BaggageToken::WorkloadName}, + {JobNameBaggageToken, BaggageToken::WorkloadName}, + {InstanceNameBaggageToken, BaggageToken::InstanceName}, + }; static absl::flat_hash_map ALL_WORKLOAD_TOKENS = { @@ -61,6 +84,36 @@ absl::optional toSuffix(WorkloadType workload_type) { } // namespace +std::string WorkloadMetadataObject::baggage() const { + const auto workload_type = toSuffix(workload_type_).value_or(PodSuffix); + std::vector parts; + if (!workload_name_.empty()) { + parts.push_back("k8s." + std::string(workload_type) + ".name=" + std::string(workload_name_)); + } + // Map the workload metadata fields to baggage tokens + const std::vector> field_to_baggage = { + {Istio::Common::NamespaceNameToken, Istio::Common::NamespaceNameBaggageToken}, + {Istio::Common::ClusterNameToken, Istio::Common::ClusterNameBaggageToken}, + {Istio::Common::ServiceNameToken, Istio::Common::ServiceNameBaggageToken}, + {Istio::Common::ServiceVersionToken, Istio::Common::ServiceVersionBaggageToken}, + {Istio::Common::AppNameToken, Istio::Common::AppNameBaggageToken}, + {Istio::Common::AppVersionToken, Istio::Common::AppVersionBaggageToken}, + {Istio::Common::InstanceNameToken, Istio::Common::InstanceNameBaggageToken}, + {Istio::Common::RegionToken, Istio::Common::LocalityRegionBaggageToken}, + {Istio::Common::ZoneToken, Istio::Common::LocalityZoneBaggageToken}, + }; + + for (const auto& [field_name, baggage_key] : field_to_baggage) { + const auto field_result = getField(field_name); + if (auto field_value = std::get_if(&field_result)) { + if (!field_value->empty()) { + parts.push_back(absl::StrCat(baggage_key, "=", *field_value)); + } + } + } + return absl::StrJoin(parts, ","); +} + Envoy::ProtobufTypes::MessagePtr WorkloadMetadataObject::serializeAsProto() const { auto message = std::make_unique(); const auto suffix = toSuffix(workload_type_); @@ -94,6 +147,12 @@ Envoy::ProtobufTypes::MessagePtr WorkloadMetadataObject::serializeAsProto() cons if (!identity_.empty()) { (*message->mutable_fields())[IdentityToken].set_string_value(identity_); } + if (!locality_region_.empty()) { + (*message->mutable_fields())[RegionToken].set_string_value(locality_region_); + } + if (!locality_zone_.empty()) { + (*message->mutable_fields())[ZoneToken].set_string_value(locality_zone_); + } if (!labels_.empty()) { auto* labels = (*message->mutable_fields())[LabelsToken].mutable_struct_value(); @@ -136,6 +195,12 @@ WorkloadMetadataObject::serializeAsPairs() const { if (!app_version_.empty()) { parts.push_back({AppVersionToken, app_version_}); } + if (!locality_region_.empty()) { + parts.push_back({RegionToken, locality_region_}); + } + if (!locality_zone_.empty()) { + parts.push_back({ZoneToken, locality_zone_}); + } if (!labels_.empty()) { for (const auto& l : labels_) { parts.push_back({absl::StrCat("labels[]", l.first), absl::string_view(l.second)}); @@ -161,6 +226,8 @@ absl::optional WorkloadMetadataObject::owner() const { return {}; } +std::string WorkloadMetadataObject::identity() const { return identity_; } + WorkloadType fromSuffix(absl::string_view suffix) { const auto it = ALL_WORKLOAD_TOKENS.find(suffix); if (it != ALL_WORKLOAD_TOKENS.end()) { @@ -195,6 +262,9 @@ google::protobuf::Struct convertWorkloadMetadataToStruct(const WorkloadMetadataO if (!obj.cluster_name_.empty()) { (*metadata.mutable_fields())[ClusterMetadataField].set_string_value(obj.cluster_name_); } + if (!obj.identity_.empty()) { + (*metadata.mutable_fields())[IdentityMetadataField].set_string_value(obj.identity_); + } auto* labels = (*metadata.mutable_fields())[LabelsMetadataField].mutable_struct_value(); if (!obj.canonical_name_.empty()) { (*labels->mutable_fields())[CanonicalNameLabel].set_string_value(obj.canonical_name_); @@ -216,6 +286,12 @@ google::protobuf::Struct convertWorkloadMetadataToStruct(const WorkloadMetadataO if (const auto owner = obj.owner(); owner.has_value()) { (*metadata.mutable_fields())[OwnerMetadataField].set_string_value(*owner); } + if (!obj.locality_region_.empty()) { + (*metadata.mutable_fields())[RegionMetadataField].set_string_value(obj.locality_region_); + } + if (!obj.locality_zone_.empty()) { + (*metadata.mutable_fields())[ZoneMetadataField].set_string_value(obj.locality_zone_); + } return metadata; } @@ -228,8 +304,15 @@ convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata) { std::unique_ptr convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, const absl::flat_hash_set& additional_labels) { - absl::string_view instance, namespace_name, owner, workload, cluster, canonical_name, - canonical_revision, app_name, app_version; + return convertStructToWorkloadMetadata(metadata, additional_labels, {}); +} + +std::unique_ptr +convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, + const absl::flat_hash_set& additional_labels, + const absl::optional locality) { + absl::string_view instance, namespace_name, owner, workload, cluster, identity, canonical_name, + canonical_revision, app_name, app_version, region, zone; std::vector> labels; for (const auto& it : metadata.fields()) { if (it.first == InstanceMetadataField) { @@ -242,6 +325,8 @@ convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, workload = it.second.string_value(); } else if (it.first == ClusterMetadataField) { cluster = it.second.string_value(); + } else if (it.first == IdentityMetadataField) { + identity = it.second.string_value(); } else if (it.first == LabelsMetadataField) { for (const auto& labels_it : it.second.struct_value().fields()) { if (labels_it.first == CanonicalNameLabel) { @@ -260,9 +345,19 @@ convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, } } } - auto obj = std::make_unique(instance, cluster, namespace_name, workload, - canonical_name, canonical_revision, app_name, - app_version, parseOwner(owner, workload), ""); + std::string locality_region = std::string(region); + std::string locality_zone = std::string(zone); + if (locality.has_value()) { + if (!locality->region().empty() && locality_region.empty()) { + locality_region = locality->region(); + } + if (!locality->zone().empty() && locality_zone.empty()) { + locality_zone = locality->zone(); + } + } + auto obj = std::make_unique( + instance, cluster, namespace_name, workload, canonical_name, canonical_revision, app_name, + app_version, parseOwner(owner, workload), identity, locality_region, locality_zone); obj->setLabels(labels); return obj; } @@ -274,7 +369,8 @@ convertEndpointMetadata(const std::string& endpoint_encoding) { return {}; } return absl::make_optional("", parts[4], parts[1], parts[0], parts[2], - parts[3], "", "", WorkloadType::Unknown, ""); + parts[3], "", "", WorkloadType::Unknown, "", + "", ""); } std::string serializeToStringDeterministic(const google::protobuf::Struct& metadata) { @@ -292,8 +388,8 @@ std::string serializeToStringDeterministic(const google::protobuf::Struct& metad WorkloadMetadataObject::FieldType WorkloadMetadataObject::getField(absl::string_view field_name) const { - const auto it = ALL_BAGGAGE_TOKENS.find(field_name); - if (it != ALL_BAGGAGE_TOKENS.end()) { + const auto it = ALL_METADATA_FIELDS.find(field_name); + if (it != ALL_METADATA_FIELDS.end()) { switch (it->second) { case BaggageToken::NamespaceName: return namespace_name_; @@ -316,12 +412,22 @@ WorkloadMetadataObject::getField(absl::string_view field_name) const { return "unknown"; case BaggageToken::InstanceName: return instance_name_; + case BaggageToken::LocalityRegion: + return locality_region_; + case BaggageToken::LocalityZone: + return locality_zone_; } } return {}; } -std::unique_ptr convertBaggageToWorkloadMetadata(absl::string_view data) { +std::unique_ptr +convertBaggageToWorkloadMetadata(absl::string_view baggage) { + return convertBaggageToWorkloadMetadata(baggage, ""); +} + +std::unique_ptr +convertBaggageToWorkloadMetadata(absl::string_view data, absl::string_view identity) { absl::string_view instance; absl::string_view cluster; absl::string_view workload; @@ -330,6 +436,8 @@ std::unique_ptr convertBaggageToWorkloadMetadata(absl::s absl::string_view canonical_revision; absl::string_view app_name; absl::string_view app_version; + absl::string_view region; + absl::string_view zone; WorkloadType workload_type = WorkloadType::Unknown; std::vector properties = absl::StrSplit(data, ','); for (absl::string_view property : properties) { @@ -344,10 +452,14 @@ std::unique_ptr convertBaggageToWorkloadMetadata(absl::s cluster = parts.second; break; case BaggageToken::ServiceName: + // canonical name and app name are always the same canonical_name = parts.second; + app_name = parts.second; break; case BaggageToken::ServiceVersion: + // canonical revision and app version are always the same canonical_revision = parts.second; + app_version = parts.second; break; case BaggageToken::AppName: app_name = parts.second; @@ -355,21 +467,31 @@ std::unique_ptr convertBaggageToWorkloadMetadata(absl::s case BaggageToken::AppVersion: app_version = parts.second; break; - case BaggageToken::WorkloadName: + case BaggageToken::WorkloadName: { workload = parts.second; + std::vector splitWorkloadKey = absl::StrSplit(parts.first, "."); + if (splitWorkloadKey.size() >= 2 && splitWorkloadKey[0] == "k8s") { + workload_type = fromSuffix(splitWorkloadKey[1]); + } break; - case BaggageToken::WorkloadType: - workload_type = fromSuffix(parts.second); - break; + } case BaggageToken::InstanceName: instance = parts.second; break; + case BaggageToken::LocalityRegion: + region = parts.second; + break; + case BaggageToken::LocalityZone: + zone = parts.second; + break; + default: + break; } } } - return std::make_unique(instance, cluster, namespace_name, workload, - canonical_name, canonical_revision, app_name, - app_version, workload_type, ""); + return std::make_unique( + instance, cluster, namespace_name, workload, canonical_name, canonical_revision, app_name, + app_version, workload_type, identity, region, zone); } } // namespace Common diff --git a/extensions/common/metadata_object.h b/extensions/common/metadata_object.h index 5d39e22f3db..956d2e7edf2 100644 --- a/extensions/common/metadata_object.h +++ b/extensions/common/metadata_object.h @@ -15,6 +15,7 @@ #pragma once #include "envoy/common/hashable.h" +#include "envoy/config/core/v3/base.pb.h" #include "envoy/stream_info/filter_state.h" #include "source/common/protobuf/protobuf.h" @@ -70,8 +71,11 @@ enum class BaggageToken { WorkloadName, WorkloadType, InstanceName, + LocalityZone, + LocalityRegion }; +// Field names accessible from WorkloadMetadataObject. constexpr absl::string_view NamespaceNameToken = "namespace"; constexpr absl::string_view ClusterNameToken = "cluster"; constexpr absl::string_view ServiceNameToken = "service"; @@ -83,13 +87,34 @@ constexpr absl::string_view WorkloadTypeToken = "type"; constexpr absl::string_view InstanceNameToken = "name"; constexpr absl::string_view LabelsToken = "labels"; constexpr absl::string_view IdentityToken = "identity"; +constexpr absl::string_view RegionToken = "region"; +constexpr absl::string_view ZoneToken = "availability_zone"; + +// Field names used to translate baggage content into +// WorkloadMetadataObject information. +constexpr absl::string_view NamespaceNameBaggageToken = "k8s.namespace.name"; +constexpr absl::string_view ClusterNameBaggageToken = "k8s.cluster.name"; +constexpr absl::string_view ServiceNameBaggageToken = "service.name"; +constexpr absl::string_view ServiceVersionBaggageToken = "service.version"; +constexpr absl::string_view AppNameBaggageToken = "app.name"; +constexpr absl::string_view AppVersionBaggageToken = "app.version"; +constexpr absl::string_view DeploymentNameBaggageToken = "k8s.deployment.name"; +constexpr absl::string_view PodNameBaggageToken = "k8s.pod.name"; +constexpr absl::string_view CronjobNameBaggageToken = "k8s.cronjob.name"; +constexpr absl::string_view JobNameBaggageToken = "k8s.job.name"; +constexpr absl::string_view InstanceNameBaggageToken = "k8s.instance.name"; +constexpr absl::string_view LocalityRegionBaggageToken = "cloud.region"; +constexpr absl::string_view LocalityZoneBaggageToken = "cloud.availability_zone"; constexpr absl::string_view InstanceMetadataField = "NAME"; constexpr absl::string_view NamespaceMetadataField = "NAMESPACE"; constexpr absl::string_view ClusterMetadataField = "CLUSTER_ID"; +constexpr absl::string_view IdentityMetadataField = "IDENTITY"; constexpr absl::string_view OwnerMetadataField = "OWNER"; constexpr absl::string_view WorkloadMetadataField = "WORKLOAD_NAME"; constexpr absl::string_view LabelsMetadataField = "LABELS"; +constexpr absl::string_view RegionMetadataField = "REGION"; +constexpr absl::string_view ZoneMetadataField = "AVAILABILITY_ZONE"; class WorkloadMetadataObject : public Envoy::StreamInfo::FilterState::Object, public Envoy::Hashable { @@ -99,22 +124,26 @@ class WorkloadMetadataObject : public Envoy::StreamInfo::FilterState::Object, absl::string_view canonical_name, absl::string_view canonical_revision, absl::string_view app_name, absl::string_view app_version, WorkloadType workload_type, - absl::string_view identity) + absl::string_view identity, absl::string_view region, + absl::string_view zone) : instance_name_(instance_name), cluster_name_(cluster_name), namespace_name_(namespace_name), workload_name_(workload_name), canonical_name_(canonical_name), canonical_revision_(canonical_revision), app_name_(app_name), app_version_(app_version), - workload_type_(workload_type), identity_(identity) {} + workload_type_(workload_type), identity_(identity), locality_region_(region), + locality_zone_(zone) {} absl::optional hash() const override; Envoy::ProtobufTypes::MessagePtr serializeAsProto() const override; std::vector> serializeAsPairs() const; absl::optional serializeAsString() const override; absl::optional owner() const; + std::string identity() const; bool hasFieldSupport() const override { return true; } using Envoy::StreamInfo::FilterState::Object::FieldType; FieldType getField(absl::string_view) const override; void setLabels(std::vector> labels) { labels_ = labels; } std::vector> getLabels() const { return labels_; } + std::string baggage() const; const std::string instance_name_; const std::string cluster_name_; @@ -126,6 +155,8 @@ class WorkloadMetadataObject : public Envoy::StreamInfo::FilterState::Object, const std::string app_version_; const WorkloadType workload_type_; const std::string identity_; + const std::string locality_region_; + const std::string locality_zone_; std::vector> labels_; }; @@ -146,6 +177,11 @@ std::unique_ptr convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, const absl::flat_hash_set& additional_labels); +std::unique_ptr +convertStructToWorkloadMetadata(const google::protobuf::Struct& metadata, + const absl::flat_hash_set& additional_labels, + const absl::optional locality); + // Convert endpoint metadata string to a metadata object. // Telemetry metadata is compressed into a semicolon separated string: // workload-name;namespace;canonical-service-name;canonical-service-revision;cluster-id. @@ -157,7 +193,9 @@ convertEndpointMetadata(const std::string& endpoint_encoding); std::string serializeToStringDeterministic(const google::protobuf::Struct& metadata); // Convert from baggage encoding. -std::unique_ptr convertBaggageToWorkloadMetadata(absl::string_view data); +std::unique_ptr convertBaggageToWorkloadMetadata(absl::string_view baggage); +std::unique_ptr +convertBaggageToWorkloadMetadata(absl::string_view baggage, absl::string_view identity); } // namespace Common } // namespace Istio diff --git a/extensions/common/metadata_object_test.cc b/extensions/common/metadata_object_test.cc index 90003b95570..1d085952599 100644 --- a/extensions/common/metadata_object_test.cc +++ b/extensions/common/metadata_object_test.cc @@ -26,17 +26,19 @@ using Envoy::Protobuf::util::MessageDifferencer; using ::testing::NiceMock; TEST(WorkloadMetadataObjectTest, Baggage) { + constexpr absl::string_view identity = "spiffe://cluster.local/ns/default/sa/default"; WorkloadMetadataObject deploy("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", WorkloadType::Deployment, ""); + "v1alpha3", "", "", WorkloadType::Deployment, identity, "", ""); WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", WorkloadType::Pod, ""); + "v1alpha3", "", "", WorkloadType::Pod, identity, "", ""); WorkloadMetadataObject cronjob("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "foo-app", "v1", WorkloadType::CronJob, ""); + "v1alpha3", "foo-app", "v1", WorkloadType::CronJob, identity, "", + ""); WorkloadMetadataObject job("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", WorkloadType::Job, ""); + "v1alpha3", "", "", WorkloadType::Job, identity, "", ""); EXPECT_EQ(deploy.serializeAsString(), absl::StrCat("type=deployment,workload=foo,name=pod-foo-1234,cluster=my-cluster,", @@ -66,8 +68,9 @@ void checkStructConversion(const Envoy::StreamInfo::FilterState::Object& data) { } TEST(WorkloadMetadataObjectTest, ConversionWithLabels) { + constexpr absl::string_view identity = "spiffe://cluster.local/ns/default/sa/default"; WorkloadMetadataObject deploy("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", WorkloadType::Deployment, ""); + "v1alpha3", "", "", WorkloadType::Deployment, identity, "", ""); deploy.setLabels({{"label1", "value1"}, {"label2", "value2"}}); auto pb = convertWorkloadMetadataToStruct(deploy); auto obj1 = convertStructToWorkloadMetadata(pb, {"label1", "label2"}); @@ -81,9 +84,12 @@ TEST(WorkloadMetadataObjectTest, ConversionWithLabels) { TEST(WorkloadMetadataObjectTest, Conversion) { { + constexpr absl::string_view identity = "spiffe://cluster.local/ns/default/sa/default"; const auto r = convertBaggageToWorkloadMetadata( - "type=deployment,workload=foo,cluster=my-cluster," - "namespace=default,service=foo-service,revision=v1alpha3,app=foo-app,version=latest"); + "k8s.deployment.name=foo,k8s.cluster.name=my-cluster," + "k8s.namespace.name=default,service.name=foo-service,service.version=v1alpha3,app.name=foo-" + "app,app.version=latest", + identity); EXPECT_EQ(absl::get(r->getField("service")), "foo-service"); EXPECT_EQ(absl::get(r->getField("revision")), "v1alpha3"); EXPECT_EQ(absl::get(r->getField("type")), DeploymentSuffix); @@ -93,43 +99,46 @@ TEST(WorkloadMetadataObjectTest, Conversion) { EXPECT_EQ(absl::get(r->getField("cluster")), "my-cluster"); EXPECT_EQ(absl::get(r->getField("app")), "foo-app"); EXPECT_EQ(absl::get(r->getField("version")), "latest"); + EXPECT_EQ(r->identity(), identity); checkStructConversion(*r); } { - const auto r = - convertBaggageToWorkloadMetadata("type=pod,name=foo-pod-435,cluster=my-cluster,namespace=" - "test,service=foo-service,revision=v1beta2"); + const auto r = convertBaggageToWorkloadMetadata( + "k8s.pod.name=foo-pod-435,k8s.cluster.name=my-cluster,k8s.namespace.name=" + "test,k8s.instance.name=foo-instance-435,service.name=foo-service,service.version=v1beta2"); EXPECT_EQ(absl::get(r->getField("service")), "foo-service"); EXPECT_EQ(absl::get(r->getField("revision")), "v1beta2"); EXPECT_EQ(absl::get(r->getField("type")), PodSuffix); - EXPECT_EQ(absl::get(r->getField("workload")), ""); - EXPECT_EQ(absl::get(r->getField("name")), "foo-pod-435"); + EXPECT_EQ(absl::get(r->getField("workload")), "foo-pod-435"); + EXPECT_EQ(absl::get(r->getField("name")), "foo-instance-435"); EXPECT_EQ(absl::get(r->getField("namespace")), "test"); EXPECT_EQ(absl::get(r->getField("cluster")), "my-cluster"); - EXPECT_EQ(absl::get(r->getField("app")), ""); - EXPECT_EQ(absl::get(r->getField("version")), ""); + EXPECT_EQ(absl::get(r->getField("app")), "foo-service"); + EXPECT_EQ(absl::get(r->getField("version")), "v1beta2"); checkStructConversion(*r); } { - const auto r = - convertBaggageToWorkloadMetadata("type=job,name=foo-job-435,cluster=my-cluster,namespace=" - "test,service=foo-service,revision=v1beta4"); + const auto r = convertBaggageToWorkloadMetadata( + "k8s.job.name=foo-job-435,k8s.cluster.name=my-cluster,k8s.namespace.name=" + "test,k8s.instance.name=foo-instance-435,service.name=foo-service,service.version=v1beta4"); EXPECT_EQ(absl::get(r->getField("service")), "foo-service"); EXPECT_EQ(absl::get(r->getField("revision")), "v1beta4"); EXPECT_EQ(absl::get(r->getField("type")), JobSuffix); - EXPECT_EQ(absl::get(r->getField("workload")), ""); - EXPECT_EQ(absl::get(r->getField("name")), "foo-job-435"); + EXPECT_EQ(absl::get(r->getField("workload")), "foo-job-435"); + EXPECT_EQ(absl::get(r->getField("name")), "foo-instance-435"); EXPECT_EQ(absl::get(r->getField("namespace")), "test"); EXPECT_EQ(absl::get(r->getField("cluster")), "my-cluster"); + EXPECT_EQ(absl::get(r->getField("app")), "foo-service"); + EXPECT_EQ(absl::get(r->getField("version")), "v1beta4"); checkStructConversion(*r); } { - const auto r = - convertBaggageToWorkloadMetadata("type=cronjob,workload=foo-cronjob,cluster=my-cluster," - "namespace=test,service=foo-service,revision=v1beta4"); + const auto r = convertBaggageToWorkloadMetadata( + "k8s.cronjob.name=foo-cronjob,k8s.cluster.name=my-cluster," + "k8s.namespace.name=test,service.name=foo-service,service.version=v1beta4"); EXPECT_EQ(absl::get(r->getField("service")), "foo-service"); EXPECT_EQ(absl::get(r->getField("revision")), "v1beta4"); EXPECT_EQ(absl::get(r->getField("type")), CronJobSuffix); @@ -137,23 +146,28 @@ TEST(WorkloadMetadataObjectTest, Conversion) { EXPECT_EQ(absl::get(r->getField("name")), ""); EXPECT_EQ(absl::get(r->getField("namespace")), "test"); EXPECT_EQ(absl::get(r->getField("cluster")), "my-cluster"); + EXPECT_EQ(absl::get(r->getField("app")), "foo-service"); + EXPECT_EQ(absl::get(r->getField("version")), "v1beta4"); checkStructConversion(*r); } { - const auto r = convertBaggageToWorkloadMetadata( - "type=deployment,workload=foo,namespace=default,service=foo-service,revision=v1alpha3"); + const auto r = + convertBaggageToWorkloadMetadata("k8s.deployment.name=foo,k8s.namespace.name=default," + "service.name=foo-service,service.version=v1alpha3"); EXPECT_EQ(absl::get(r->getField("service")), "foo-service"); EXPECT_EQ(absl::get(r->getField("revision")), "v1alpha3"); EXPECT_EQ(absl::get(r->getField("type")), DeploymentSuffix); EXPECT_EQ(absl::get(r->getField("workload")), "foo"); EXPECT_EQ(absl::get(r->getField("namespace")), "default"); EXPECT_EQ(absl::get(r->getField("cluster")), ""); + EXPECT_EQ(absl::get(r->getField("app")), "foo-service"); + EXPECT_EQ(absl::get(r->getField("version")), "v1alpha3"); checkStructConversion(*r); } { - const auto r = convertBaggageToWorkloadMetadata("namespace=default"); + const auto r = convertBaggageToWorkloadMetadata("k8s.namespace.name=default"); EXPECT_EQ(absl::get(r->getField("namespace")), "default"); checkStructConversion(*r); } diff --git a/source/extensions/common/workload_discovery/api.cc b/source/extensions/common/workload_discovery/api.cc index 86ffcd24609..28940840342 100644 --- a/source/extensions/common/workload_discovery/api.cc +++ b/source/extensions/common/workload_discovery/api.cc @@ -69,7 +69,8 @@ Istio::Common::WorkloadMetadataObject convert(const istio::workload::Workload& w return Istio::Common::WorkloadMetadataObject( workload.name(), workload.cluster_id(), ns, workload.workload_name(), workload.canonical_name(), workload.canonical_revision(), workload.canonical_name(), - workload.canonical_revision(), workload_type, identity); + workload.canonical_revision(), workload_type, identity, workload.locality().region(), + workload.locality().zone()); } } // namespace diff --git a/source/extensions/filters/http/istio_stats/istio_stats.cc b/source/extensions/filters/http/istio_stats/istio_stats.cc index 3f85652a0b9..825d8561235 100644 --- a/source/extensions/filters/http/istio_stats/istio_stats.cc +++ b/source/extensions/filters/http/istio_stats/istio_stats.cc @@ -164,7 +164,8 @@ peerInfo(Reporter reporter, const StreamInfo::FilterState& filter_state) { extractString(obj, Istio::Common::AppNameToken), extractString(obj, Istio::Common::AppVersionToken), Istio::Common::fromSuffix(extractString(obj, Istio::Common::WorkloadTypeToken)), - extractString(obj, Istio::Common::IdentityToken)); + extractString(obj, Istio::Common::IdentityToken), + extractString(obj, Istio::Common::RegionToken), extractString(obj, Istio::Common::ZoneToken)); // Extract labels from the "labels" field const auto& labels_it = obj.fields().find(Istio::Common::LabelsToken); diff --git a/source/extensions/filters/http/peer_metadata/config.proto b/source/extensions/filters/http/peer_metadata/config.proto index 04e81f4f6b5..22f7b036830 100644 --- a/source/extensions/filters/http/peer_metadata/config.proto +++ b/source/extensions/filters/http/peer_metadata/config.proto @@ -20,8 +20,7 @@ package io.istio.http.peer_metadata; // Peer metadata provider filter. This filter encapsulates the discovery of the // peer telemetry attributes for consumption by the telemetry filters. message Config { - // DEPRECATED. - // This method uses `baggage` header encoding. + // This method uses `baggage` header encoding. Only used for HTTP CONNECT tunnels. message Baggage { } @@ -43,12 +42,25 @@ message Config { bool skip_external_clusters = 1; } + // This method extracts peer metadata from the upstream filter state if it's available. + // + // Upstream filter state could be populated by multiple means in general, but in practice the intention here is that + // upstream PeerMetadata filter will populate the filter state with peer details extracted from the baggage header + // sent in response. + // + // Naturally this metadata discovery method only makes sense for upstream peer metadata discovery. + message UpstreamFilterState { + // Upstream filter state key that will be used to store peer metadata. + string peer_metadata_key = 1; + } + // An exhaustive list of the derivation methods. message DiscoveryMethod { oneof method_specifier { Baggage baggage = 1; WorkloadDiscovery workload_discovery = 2; IstioHeaders istio_headers = 3; + UpstreamFilterState upstream_filter_state = 4; } } @@ -64,6 +76,7 @@ message Config { message PropagationMethod { oneof method_specifier { IstioHeaders istio_headers = 1; + Baggage baggage = 2; } } diff --git a/source/extensions/filters/http/peer_metadata/filter.cc b/source/extensions/filters/http/peer_metadata/filter.cc index e4645c65ffe..81974201947 100644 --- a/source/extensions/filters/http/peer_metadata/filter.cc +++ b/source/extensions/filters/http/peer_metadata/filter.cc @@ -174,6 +174,51 @@ absl::optional MXMethod::lookup(absl::string_view id, absl::string_vie return *out; } +class UpstreamFilterStateMethod : public DiscoveryMethod { +public: + UpstreamFilterStateMethod( + const io::istio::http::peer_metadata::Config_UpstreamFilterState& config) + : peer_metadata_key_(config.peer_metadata_key()) {} + absl::optional derivePeerInfo(const StreamInfo::StreamInfo&, Http::HeaderMap&, + Context&) const override; + +private: + std::string peer_metadata_key_; +}; + +absl::optional +UpstreamFilterStateMethod::derivePeerInfo(const StreamInfo::StreamInfo& info, Http::HeaderMap&, + Context&) const { + const auto upstream = info.upstreamInfo(); + if (!upstream) { + return {}; + } + + const auto filter_state = upstream->upstreamFilterState(); + if (!filter_state) { + return {}; + } + + const auto* cel_state = + filter_state->getDataReadOnly( + peer_metadata_key_); + if (!cel_state) { + return {}; + } + + google::protobuf::Struct obj; + if (!obj.ParseFromString(absl::string_view(cel_state->value()))) { + return {}; + } + + std::unique_ptr peer_info = ::Istio::Common::convertStructToWorkloadMetadata(obj); + if (!peer_info) { + return {}; + } + + return *peer_info; +} + MXPropagationMethod::MXPropagationMethod( bool downstream, Server::Configuration::ServerFactoryContext& factory_context, const absl::flat_hash_set& additional_labels, @@ -205,6 +250,41 @@ void MXPropagationMethod::inject(const StreamInfo::StreamInfo& info, Http::Heade } } +BaggagePropagationMethod::BaggagePropagationMethod( + Server::Configuration::ServerFactoryContext& factory_context, + const io::istio::http::peer_metadata::Config_Baggage&) + : value_(computeBaggageValue(factory_context)) {} + +std::string BaggagePropagationMethod::computeBaggageValue( + Server::Configuration::ServerFactoryContext& factory_context) const { + const auto obj = Istio::Common::convertStructToWorkloadMetadata( + factory_context.localInfo().node().metadata(), {}, + factory_context.localInfo().node().locality()); + return obj->baggage(); +} + +void BaggagePropagationMethod::inject(const StreamInfo::StreamInfo&, Http::HeaderMap& headers, + Context&) const { + headers.setReference(Headers::get().Baggage, value_); +} + +BaggageDiscoveryMethod::BaggageDiscoveryMethod() {} + +absl::optional BaggageDiscoveryMethod::derivePeerInfo(const StreamInfo::StreamInfo&, + Http::HeaderMap& headers, + Context&) const { + const auto baggage_header = headers.get(Headers::get().Baggage); + if (baggage_header.empty()) { + return {}; + } + const auto baggage_value = baggage_header[0]->value().getStringView(); + const auto workload = Istio::Common::convertBaggageToWorkloadMetadata(baggage_value); + if (workload) { + return *workload; + } + return {}; +} + FilterConfig::FilterConfig(const io::istio::http::peer_metadata::Config& config, Server::Configuration::FactoryContext& factory_context) : shared_with_upstream_(config.shared_with_upstream()), @@ -240,6 +320,23 @@ std::vector FilterConfig::buildDiscoveryMethods( methods.push_back(std::make_unique(downstream, additional_labels, factory_context.serverFactoryContext())); break; + case io::istio::http::peer_metadata::Config::DiscoveryMethod::MethodSpecifierCase::kBaggage: + if (downstream) { + methods.push_back(std::make_unique()); + } else { + ENVOY_LOG(warn, "BaggageDiscovery peer metadata discovery option is only available for " + "downstream peer discovery"); + } + case io::istio::http::peer_metadata::Config::DiscoveryMethod::MethodSpecifierCase:: + kUpstreamFilterState: + if (!downstream) { + methods.push_back( + std::make_unique(method.upstream_filter_state())); + } else { + ENVOY_LOG(warn, "UpstreamFilterState peer metadata discovery option is only available for " + "upstream peer discovery"); + } + break; default: break; } @@ -262,6 +359,10 @@ std::vector FilterConfig::buildPropagationMethods( std::make_unique(downstream, factory_context.serverFactoryContext(), additional_labels, method.istio_headers())); break; + case io::istio::http::peer_metadata::Config::PropagationMethod::MethodSpecifierCase::kBaggage: + methods.push_back(std::make_unique( + factory_context.serverFactoryContext(), method.baggage())); + break; default: break; } diff --git a/source/extensions/filters/http/peer_metadata/filter.h b/source/extensions/filters/http/peer_metadata/filter.h index cf0d68803c4..4a04f76a8d1 100644 --- a/source/extensions/filters/http/peer_metadata/filter.h +++ b/source/extensions/filters/http/peer_metadata/filter.h @@ -20,6 +20,7 @@ #include "source/extensions/filters/http/peer_metadata/config.pb.h" #include "source/extensions/common/workload_discovery/api.h" #include "source/common/singleton/const_singleton.h" +#include namespace Envoy { namespace Extensions { @@ -30,6 +31,7 @@ using ::Envoy::Extensions::Filters::Common::Expr::CelStatePrototype; using ::Envoy::Extensions::Filters::Common::Expr::CelStateType; struct HeaderValues { + const Http::LowerCaseString Baggage{"baggage"}; const Http::LowerCaseString ExchangeMetadataHeader{"x-envoy-peer-metadata"}; const Http::LowerCaseString ExchangeMetadataHeaderId{"x-envoy-peer-metadata-id"}; const Http::LowerCaseString ExchangeMetadataOriginNetwork{"x-forwarded-network"}; @@ -100,6 +102,24 @@ class MXPropagationMethod : public PropagationMethod { bool skipMXHeaders(const bool, const StreamInfo::StreamInfo&) const; }; +class BaggagePropagationMethod : public PropagationMethod { +public: + BaggagePropagationMethod(Server::Configuration::ServerFactoryContext& factory_context, + const io::istio::http::peer_metadata::Config_Baggage&); + void inject(const StreamInfo::StreamInfo&, Http::HeaderMap&, Context&) const override; + +private: + std::string computeBaggageValue(Server::Configuration::ServerFactoryContext&) const; + const std::string value_; +}; + +class BaggageDiscoveryMethod : public DiscoveryMethod, public Logger::Loggable { +public: + BaggageDiscoveryMethod(); + absl::optional derivePeerInfo(const StreamInfo::StreamInfo&, Http::HeaderMap&, + Context&) const override; +}; + class FilterConfig : public Logger::Loggable { public: FilterConfig(const io::istio::http::peer_metadata::Config&, @@ -143,7 +163,7 @@ class FilterConfig : public Logger::Loggable { using FilterConfigSharedPtr = std::shared_ptr; -class Filter : public Http::PassThroughFilter { +class Filter : public Http::PassThroughFilter, public Logger::Loggable { public: Filter(const FilterConfigSharedPtr& config) : config_(config) {} Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override; diff --git a/source/extensions/filters/http/peer_metadata/filter_test.cc b/source/extensions/filters/http/peer_metadata/filter_test.cc index 0eb27cc6505..6c0dd8b3b89 100644 --- a/source/extensions/filters/http/peer_metadata/filter_test.cc +++ b/source/extensions/filters/http/peer_metadata/filter_test.cc @@ -130,7 +130,8 @@ TEST_F(PeerMetadataTest, DownstreamXDSNone) { TEST_F(PeerMetadataTest, DownstreamXDS) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -153,7 +154,8 @@ TEST_F(PeerMetadataTest, DownstreamXDS) { TEST_F(PeerMetadataTest, DownstreamXDSCrossNetwork) { request_headers_.setReference(Headers::get().ExchangeMetadataOriginNetwork, "remote-network"); const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -176,7 +178,8 @@ TEST_F(PeerMetadataTest, DownstreamXDSCrossNetwork) { TEST_F(PeerMetadataTest, UpstreamXDS) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "foo", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -212,7 +215,8 @@ TEST_F(PeerMetadataTest, UpstreamXDSInternal) { *host_metadata); const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "foo", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -251,7 +255,8 @@ TEST_F(PeerMetadataTest, UpstreamXDSInternalCrossNetwork) { *host_metadata); const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "foo", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -320,7 +325,8 @@ TEST_F(PeerMetadataTest, DownstreamFallbackFirst) { TEST_F(PeerMetadataTest, DownstreamFallbackSecond) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -403,7 +409,8 @@ TEST_F(PeerMetadataTest, UpstreamFallbackFirst) { TEST_F(PeerMetadataTest, UpstreamFallbackSecond) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "foo", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -425,7 +432,8 @@ TEST_F(PeerMetadataTest, UpstreamFallbackSecond) { TEST_F(PeerMetadataTest, UpstreamFallbackFirstXDS) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "foo", "foo", "foo-service", - "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "", "", Istio::Common::WorkloadType::Pod, "", "", + ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -550,7 +558,8 @@ TEST_F(PeerMetadataTest, UpstreamMXPropagationSkipPassthrough) { TEST_F(PeerMetadataTest, FieldAccessorSupport) { const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service", - "v1alpha3", "myapp", "v1", Istio::Common::WorkloadType::Pod, ""); + "v1alpha3", "myapp", "v1", Istio::Common::WorkloadType::Pod, "", + "", ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -586,7 +595,7 @@ TEST_F(PeerMetadataTest, FieldAccessorSupport) { TEST_F(PeerMetadataTest, CelExpressionCompatibility) { const WorkloadMetadataObject pod("pod-bar-5678", "test-cluster", "production", "bar", "bar-service", "v2", "barapp", "v2", - Istio::Common::WorkloadType::Pod, ""); + Istio::Common::WorkloadType::Pod, "", "", ""); EXPECT_CALL(*metadata_provider_, GetMetadata(_)) .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) -> std::optional { @@ -622,6 +631,577 @@ TEST_F(PeerMetadataTest, CelExpressionCompatibility) { EXPECT_EQ("bar", extractString(*struct_proto, "workload")); EXPECT_EQ("test-cluster", extractString(*struct_proto, "cluster")); } +TEST_F(PeerMetadataTest, DownstreamBaggagePropagation) { + initialize(R"EOF( + downstream_propagation: + - baggage: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + EXPECT_TRUE(response_headers_.has(Headers::get().Baggage)); + checkNoPeer(true); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, UpstreamBaggagePropagation) { + initialize(R"EOF( + upstream_propagation: + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + EXPECT_TRUE(request_headers_.has(Headers::get().Baggage)); + checkNoPeer(true); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, BothDirectionsBaggagePropagation) { + initialize(R"EOF( + downstream_propagation: + - baggage: {} + upstream_propagation: + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + EXPECT_TRUE(request_headers_.has(Headers::get().Baggage)); + EXPECT_TRUE(response_headers_.has(Headers::get().Baggage)); + checkNoPeer(true); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, BaggagePropagationWithNodeMetadata) { + // Setup node metadata that would be converted to baggage + auto& node = context_.server_factory_context_.local_info_.node_; + TestUtility::loadFromYaml(R"EOF( + metadata: + NAMESPACE: production + CLUSTER_ID: test-cluster + WORKLOAD_NAME: test-workload + NAME: test-instance + LABELS: + app: test-app + version: v1.0 + service.istio.io/canonical-name: test-service + service.istio.io/canonical-revision: main + )EOF", + node); + + initialize(R"EOF( + downstream_propagation: + - baggage: {} + )EOF"); + + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + + const auto baggage_header = response_headers_.get(Headers::get().Baggage); + ASSERT_FALSE(baggage_header.empty()); + + std::string baggage_value = std::string(baggage_header[0]->value().getStringView()); + // Verify baggage contains expected key-value pairs + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.namespace.name=production")); + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.cluster.name=test-cluster")); + EXPECT_TRUE(absl::StrContains(baggage_value, "app.name=test-app")); + EXPECT_TRUE(absl::StrContains(baggage_value, "app.version=v1.0")); + EXPECT_TRUE(absl::StrContains(baggage_value, "service.name=test-service")); + EXPECT_TRUE(absl::StrContains(baggage_value, "service.version=main")); + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.pod.name=test-workload")); + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.instance.name=test-instance")); +} + +// Test class specifically for BaggagePropagationMethod unit tests +class BaggagePropagationMethodTest : public testing::Test { +protected: + BaggagePropagationMethodTest() = default; + + void SetUp() override { + TestUtility::loadFromYaml(R"EOF( + metadata: + NAMESPACE: test-namespace + CLUSTER_ID: sample-cluster + WORKLOAD_NAME: sample-workload + NAME: sample-instance + LABELS: + app: sample-app + version: v2.1 + service.istio.io/canonical-name: sample-service + service.istio.io/canonical-revision: stable + locality: + zone: us-east4-b + region: us-east4 + )EOF", + context_.server_factory_context_.local_info_.node_); + } + + NiceMock context_; + NiceMock stream_info_; +}; + +TEST_F(BaggagePropagationMethodTest, DownstreamBaggageInjection) { + io::istio::http::peer_metadata::Config_Baggage baggage_config; + BaggagePropagationMethod method(context_.server_factory_context_, baggage_config); + + Http::TestResponseHeaderMapImpl headers; + Context ctx; + + method.inject(stream_info_, headers, ctx); + + EXPECT_EQ(1, headers.size()); + const auto baggage_header = headers.get(Headers::get().Baggage); + ASSERT_FALSE(baggage_header.empty()); + + std::string baggage_value = std::string(baggage_header[0]->value().getStringView()); + + // Verify all expected tokens are present + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.namespace.name=test-namespace")); + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.cluster.name=sample-cluster")); + EXPECT_TRUE(absl::StrContains(baggage_value, "service.name=sample-service")); + EXPECT_TRUE(absl::StrContains(baggage_value, "service.version=stable")); + EXPECT_TRUE(absl::StrContains(baggage_value, "app.name=sample-app")); + EXPECT_TRUE(absl::StrContains(baggage_value, "app.version=v2.1")); + EXPECT_TRUE( + absl::StrContains(baggage_value, "k8s.pod.name=sample-workload")); // workload type is pod + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.instance.name=sample-instance")); + EXPECT_TRUE(absl::StrContains(baggage_value, "cloud.region=us-east4")); + EXPECT_TRUE(absl::StrContains(baggage_value, "cloud.availability_zone=us-east4-b")); +} + +TEST_F(BaggagePropagationMethodTest, UpstreamBaggageInjection) { + io::istio::http::peer_metadata::Config_Baggage baggage_config; + BaggagePropagationMethod method(context_.server_factory_context_, baggage_config); + + Http::TestRequestHeaderMapImpl headers; + Context ctx; + + method.inject(stream_info_, headers, ctx); + + EXPECT_EQ(1, headers.size()); + const auto baggage_header = headers.get(Headers::get().Baggage); + ASSERT_FALSE(baggage_header.empty()); + + std::string baggage_value = std::string(baggage_header[0]->value().getStringView()); + + // Verify tokens are properly formatted + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.namespace.name=test-namespace")); + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.cluster.name=sample-cluster")); + + // Check that values are comma-separated + std::vector parts = absl::StrSplit(baggage_value, ','); + EXPECT_GT(parts.size(), 1); +} + +TEST_F(BaggagePropagationMethodTest, EmptyMetadataBaggage) { + // Reset node metadata to empty + context_.server_factory_context_.local_info_.node_.Clear(); + + io::istio::http::peer_metadata::Config_Baggage baggage_config; + BaggagePropagationMethod method(context_.server_factory_context_, baggage_config); + + Http::TestResponseHeaderMapImpl headers; + Context ctx; + + method.inject(stream_info_, headers, ctx); + + EXPECT_EQ(1, headers.size()); + const auto baggage_header = headers.get(Headers::get().Baggage); + ASSERT_FALSE(baggage_header.empty()); + + // With empty metadata, there should be no baggage + std::string baggage_value = std::string(baggage_header[0]->value().getStringView()); + EXPECT_EQ("", baggage_value); +} + +TEST_F(BaggagePropagationMethodTest, PartialMetadataBaggage) { + // Setup node metadata with only some fields + TestUtility::loadFromYaml(R"EOF( + metadata: + NAMESPACE: partial-namespace + LABELS: + app: partial-app + # Missing other fields like version, cluster, etc. + )EOF", + context_.server_factory_context_.local_info_.node_); + + io::istio::http::peer_metadata::Config_Baggage baggage_config; + BaggagePropagationMethod method(context_.server_factory_context_, baggage_config); + + Http::TestRequestHeaderMapImpl headers; + Context ctx; + + method.inject(stream_info_, headers, ctx); + + EXPECT_EQ(1, headers.size()); + const auto baggage_header = headers.get(Headers::get().Baggage); + ASSERT_FALSE(baggage_header.empty()); + + std::string baggage_value = std::string(baggage_header[0]->value().getStringView()); + + // Should contain only the fields that were present + EXPECT_TRUE(absl::StrContains(baggage_value, "k8s.namespace.name=partial-namespace")); + EXPECT_TRUE(absl::StrContains(baggage_value, "app.name=partial-app")); + + // Should not contain fields that were not present + EXPECT_FALSE(absl::StrContains(baggage_value, "app.version=")); + EXPECT_FALSE(absl::StrContains(baggage_value, "k8s.cluster.name=")); +} + +TEST_F(PeerMetadataTest, BaggagePropagationWithMixedConfig) { + initialize(R"EOF( + downstream_propagation: + - baggage: {} + - istio_headers: {} + upstream_propagation: + - baggage: {} + - istio_headers: {} + )EOF"); + + // Baggage should always be propagated, Istio headers are also propagated for upstream only + EXPECT_EQ(3, request_headers_.size()); // baggage + istio headers (id + metadata) + EXPECT_EQ(1, response_headers_.size()); // baggage only (no discovery, so no MX downstream) + + EXPECT_TRUE(request_headers_.has(Headers::get().Baggage)); + EXPECT_TRUE(request_headers_.has(Headers::get().ExchangeMetadataHeaderId)); + EXPECT_TRUE(request_headers_.has(Headers::get().ExchangeMetadataHeader)); + + EXPECT_TRUE(response_headers_.has(Headers::get().Baggage)); +} + +// Baggage Discovery Tests + +TEST_F(PeerMetadataTest, DownstreamBaggageDiscoveryEmpty) { + initialize(R"EOF( + downstream_discovery: + - baggage: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkNoPeer(true); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, UpstreamBaggageDiscoveryEmpty) { + // The baggage discovery filter should only be used for downstream + // peer metadata detection. + initialize(R"EOF( + upstream_discovery: + - baggage: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkNoPeer(true); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, DownstreamBaggageDiscovery) { + request_headers_.setReference( + Headers::get().Baggage, + "k8s.namespace.name=test-namespace,k8s.cluster.name=test-cluster," + "service.name=test-service,service.version=v1,k8s.deployment.name=test-workload," + "k8s.workload.type=deployment,k8s.instance.name=test-instance-123," + "app.name=test-app,app.version=v2.0"); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkPeerNamespace(true, "test-namespace"); + checkNoPeer(false); + checkShared(false); +} + +TEST_F(PeerMetadataTest, UpstreamBaggageDiscovery) { + response_headers_.setReference( + Headers::get().Baggage, + "k8s.namespace.name=upstream-namespace,k8s.cluster.name=upstream-cluster," + "service.name=upstream-service,service.version=v2,k8s.workload.name=upstream-workload," + "k8s.workload.type=pod,k8s.instance.name=upstream-instance-456," + "app.name=upstream-app,app.version=v3.0"); + initialize(R"EOF( + upstream_discovery: + - baggage: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + checkNoPeer(true); + // Baggage discovery should ignore upstream. + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, BothDirectionsBaggageDiscovery) { + request_headers_.setReference(Headers::get().Baggage, + "k8s.namespace.name=downstream-ns,service.name=downstream-svc"); + response_headers_.setReference(Headers::get().Baggage, + "k8s.namespace.name=upstream-ns,service.name=upstream-svc"); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + upstream_discovery: + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + checkPeerNamespace(true, "downstream-ns"); + // Baggage discovery should ignore upstream + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, DownstreamBaggageFallbackFirst) { + // Baggage is present, so XDS should not be called + request_headers_.setReference( + Headers::get().Baggage, "k8s.namespace.name=baggage-namespace,service.name=baggage-service"); + EXPECT_CALL(*metadata_provider_, GetMetadata(_)).Times(0); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + - workload_discovery: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkPeerNamespace(true, "baggage-namespace"); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, DownstreamBaggageFallbackSecond) { + // No baggage header, so XDS should be called as fallback + const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "xds-namespace", "foo", + "foo-service", "v1alpha3", "", "", + Istio::Common::WorkloadType::Pod, "", "us-east4", "us-east4-b"); + EXPECT_CALL(*metadata_provider_, GetMetadata(_)) + .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) + -> std::optional { + if (absl::StartsWith(address->asStringView(), "127.0.0.1")) { + return {pod}; + } + return {}; + })); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + - workload_discovery: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkPeerNamespace(true, "xds-namespace"); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, UpstreamBaggageFallbackFirst) { + // Baggage is present, but ignored as it's coming from upstream. + response_headers_.setReference( + Headers::get().Baggage, + "k8s.namespace.name=baggage-upstream,service.name=baggage-upstream-service"); + // WDS information is also present, and this is the one tha tshould be used. + const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "xds-upstream", "foo", + "foo-service", "v1alpha3", "", "", + Istio::Common::WorkloadType::Pod, "", "us-east4", "us-east4-b"); + EXPECT_CALL(*metadata_provider_, GetMetadata(_)) + .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) + -> std::optional { + if (absl::StartsWith(address->asStringView(), "10.0.0.1")) { + return {pod}; + } + return {}; + })); + initialize(R"EOF( + upstream_discovery: + - baggage: {} + - workload_discovery: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(1, response_headers_.size()); + checkNoPeer(true); + checkPeerNamespace(false, "xds-upstream"); +} + +TEST_F(PeerMetadataTest, UpstreamBaggageFallbackSecond) { + // No baggage header, baggage is ignored as it's coming from upstream, + // but workload discovery should pick up the details. + const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "xds-upstream", "foo", + "foo-service", "v1alpha3", "", "", + Istio::Common::WorkloadType::Pod, "", "us-east4", "us-east4-b"); + EXPECT_CALL(*metadata_provider_, GetMetadata(_)) + .WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address) + -> std::optional { + if (absl::StartsWith(address->asStringView(), "10.0.0.1")) { + return {pod}; + } + return {}; + })); + initialize(R"EOF( + upstream_discovery: + - baggage: {} + - workload_discovery: {} + )EOF"); + EXPECT_EQ(0, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkNoPeer(true); + checkPeerNamespace(false, "xds-upstream"); +} + +TEST_F(PeerMetadataTest, DownstreamBaggageWithMXFallback) { + // Baggage is present, so MX should not be used + request_headers_.setReference(Headers::get().Baggage, + "k8s.namespace.name=baggage-ns,service.name=baggage-svc"); + request_headers_.setReference(Headers::get().ExchangeMetadataHeaderId, "test-pod"); + request_headers_.setReference(Headers::get().ExchangeMetadataHeader, SampleIstioHeader); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + - istio_headers: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + checkPeerNamespace(true, "baggage-ns"); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, DownstreamMXWithBaggageFallback) { + // MX is first, so it should be used even if baggage is present + request_headers_.setReference(Headers::get().Baggage, + "k8s.namespace.name=baggage-ns,service.name=baggage-svc"); + request_headers_.setReference(Headers::get().ExchangeMetadataHeaderId, "test-pod"); + request_headers_.setReference(Headers::get().ExchangeMetadataHeader, SampleIstioHeader); + initialize(R"EOF( + downstream_discovery: + - istio_headers: {} + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); + EXPECT_EQ(0, response_headers_.size()); + // MX header has namespace "default" from SampleIstioHeader + checkPeerNamespace(true, "default"); + checkNoPeer(false); +} + +TEST_F(PeerMetadataTest, BaggageDiscoveryWithPropagation) { + request_headers_.setReference(Headers::get().Baggage, + "k8s.namespace.name=discovered-ns,service.name=discovered-svc"); + initialize(R"EOF( + downstream_discovery: + - baggage: {} + downstream_propagation: + - baggage: {} + upstream_propagation: + - baggage: {} + )EOF"); + EXPECT_EQ(1, request_headers_.size()); // upstream baggage propagation + EXPECT_EQ(1, response_headers_.size()); // downstream baggage propagation + EXPECT_TRUE(request_headers_.has(Headers::get().Baggage)); + EXPECT_TRUE(response_headers_.has(Headers::get().Baggage)); + checkPeerNamespace(true, "discovered-ns"); + checkNoPeer(false); +} + +// Test class specifically for BaggageDiscoveryMethod unit tests +class BaggageDiscoveryMethodTest : public testing::Test { +protected: + BaggageDiscoveryMethodTest() = default; + + NiceMock context_; + NiceMock stream_info_; +}; + +TEST_F(BaggageDiscoveryMethodTest, DerivePeerInfoFromBaggage) { + BaggageDiscoveryMethod method; + + Http::TestRequestHeaderMapImpl headers; + headers.setReference( + Headers::get().Baggage, + "k8s.namespace.name=unit-test-namespace,k8s.cluster.name=unit-test-cluster," + "service.name=unit-test-service,service.version=v1.0," + "k8s.deployment.name=unit-test-workload,k8s.workload.type=deployment," + "k8s.instance.name=unit-test-instance,app.name=unit-test-app,app.version=v2.0"); + Context ctx; + + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ("unit-test-namespace", result->namespace_name_); + EXPECT_EQ("unit-test-cluster", result->cluster_name_); + EXPECT_EQ("unit-test-service", result->canonical_name_); + EXPECT_EQ("v1.0", result->canonical_revision_); + EXPECT_EQ("unit-test-workload", result->workload_name_); + EXPECT_EQ("unit-test-instance", result->instance_name_); + EXPECT_EQ("unit-test-app", result->app_name_); + EXPECT_EQ("v2.0", result->app_version_); + EXPECT_EQ(Istio::Common::WorkloadType::Deployment, result->workload_type_); +} + +TEST_F(BaggageDiscoveryMethodTest, DerivePeerInfoEmptyBaggage) { + BaggageDiscoveryMethod method; + + Http::TestRequestHeaderMapImpl headers; + Context ctx; + + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + + EXPECT_FALSE(result.has_value()); +} + +TEST_F(BaggageDiscoveryMethodTest, DerivePeerInfoPartialBaggage) { + BaggageDiscoveryMethod method; + + Http::TestResponseHeaderMapImpl headers; + headers.setReference(Headers::get().Baggage, + "k8s.namespace.name=partial-ns,service.name=partial-svc"); + Context ctx; + + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ("partial-ns", result->namespace_name_); + EXPECT_EQ("partial-svc", result->canonical_name_); + // Other fields should be empty or default + EXPECT_TRUE(result->cluster_name_.empty()); + EXPECT_TRUE(result->workload_name_.empty()); +} + +TEST_F(BaggageDiscoveryMethodTest, DerivePeerInfoAllWorkloadTypes) { + BaggageDiscoveryMethod method; + Context ctx; + + // Test Pod workload type + { + Http::TestRequestHeaderMapImpl headers; + headers.setReference(Headers::get().Baggage, + "k8s.namespace.name=test-ns,k8s.pod.name=pod-name"); + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(Istio::Common::WorkloadType::Pod, result->workload_type_); + } + + // Test Deployment workload type + { + Http::TestRequestHeaderMapImpl headers; + headers.setReference(Headers::get().Baggage, + "k8s.namespace.name=test-ns,k8s.deployment.name=deployment-name"); + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(Istio::Common::WorkloadType::Deployment, result->workload_type_); + } + + // Test Job workload type + { + Http::TestRequestHeaderMapImpl headers; + headers.setReference(Headers::get().Baggage, + "k8s.namespace.name=test-ns,k8s.job.name=job-name"); + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(Istio::Common::WorkloadType::Job, result->workload_type_); + } + + // Test CronJob workload type + { + Http::TestRequestHeaderMapImpl headers; + headers.setReference(Headers::get().Baggage, + "k8s.namespace.name=test-ns,k8s.cronjob.name=cronjob-name"); + const auto result = method.derivePeerInfo(stream_info_, headers, ctx); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(Istio::Common::WorkloadType::CronJob, result->workload_type_); + } +} } // namespace } // namespace PeerMetadata diff --git a/source/extensions/filters/network/metadata_exchange/metadata_exchange.cc b/source/extensions/filters/network/metadata_exchange/metadata_exchange.cc index 997d32e182c..64875406185 100644 --- a/source/extensions/filters/network/metadata_exchange/metadata_exchange.cc +++ b/source/extensions/filters/network/metadata_exchange/metadata_exchange.cc @@ -188,8 +188,9 @@ void MetadataExchangeFilter::writeNodeMetadata() { } ENVOY_LOG(trace, "Writing metadata to the connection."); Protobuf::Struct data; - const auto obj = Istio::Common::convertStructToWorkloadMetadata(local_info_.node().metadata(), - config_->additional_labels_); + const auto obj = Istio::Common::convertStructToWorkloadMetadata( + local_info_.node().metadata(), config_->additional_labels_, local_info_.node().locality()); + *(*data.mutable_fields())[ExchangeMetadataHeader].mutable_struct_value() = Istio::Common::convertWorkloadMetadataToStruct(*obj); std::string metadata_id = getMetadataId(); diff --git a/source/extensions/filters/network/peer_metadata/BUILD b/source/extensions/filters/network/peer_metadata/BUILD new file mode 100644 index 00000000000..750ee09981f --- /dev/null +++ b/source/extensions/filters/network/peer_metadata/BUILD @@ -0,0 +1,57 @@ +# Copyright 2026 Istio Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +########################################################################## + +# Ambient Peer Metadata filters +load( + "@envoy//bazel:envoy_build_system.bzl", + "envoy_cc_library", +) +load( + "@envoy//bazel:envoy_library.bzl", + "envoy_proto_library", +) + +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +envoy_proto_library( + name = "config", + srcs = ["config.proto"], +) + +envoy_cc_library( + name = "peer_metadata", + srcs = [ + "peer_metadata.cc", + ], + repository = "@envoy", + deps = [ + ":config_cc_proto", + "//extensions/common:metadata_object_lib", + "@envoy//envoy/buffer:buffer_interface", + "@envoy//envoy/network:address_interface", + "@envoy//envoy/network:filter_interface", + "@envoy//envoy/server:filter_config_interface", + "@envoy//source/common/common:minimal_logger_lib", + "@envoy//source/common/router:string_accessor_lib", + "@envoy//source/common/singleton:const_singleton", + "@envoy//source/common/stream_info:bool_accessor_lib", + "@envoy//source/common/tcp_proxy", + "@envoy//source/extensions/filters/common/expr:cel_state_lib", + "@envoy//source/extensions/filters/network/common:factory_base_lib", + ], +) diff --git a/source/extensions/filters/network/peer_metadata/config.proto b/source/extensions/filters/network/peer_metadata/config.proto new file mode 100644 index 00000000000..92fb707d823 --- /dev/null +++ b/source/extensions/filters/network/peer_metadata/config.proto @@ -0,0 +1,44 @@ +/* Copyright 2026 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package envoy.extensions.network_filters.peer_metadata; + +message Config { + // What filter state to use to save the baggage value that encodes the proxy + // workload. + // + // The upstream filter that will populate the baggage header in the HBONE + // request should be configured to use the same key. + // + // Why share baggage value via filter state instead of configruing upstream + // filter to use the baggage key value directly? + // + // ztunnel and waypoint have to be aware of the baggage header format, + // because they should be able to parse baggage headers to extract the + // metadata and report the metrics. However, pilot does not need to be aware + // of the baggage encoding yet. + // + // If instead of using custom filter to generate baggage header value we just + // let pilot generate it, it would spread the logic for generating baggage to + // the pilot as well. While not a big deal, if there is no clear reason to do + // it, let's not duplicate the implementation of baggage logic in pilot and + // just re-use the logic we already have in Envoy. + string baggage_key = 1; +} + +message UpstreamConfig { +} diff --git a/source/extensions/filters/network/peer_metadata/peer_metadata.cc b/source/extensions/filters/network/peer_metadata/peer_metadata.cc new file mode 100644 index 00000000000..b233857a575 --- /dev/null +++ b/source/extensions/filters/network/peer_metadata/peer_metadata.cc @@ -0,0 +1,575 @@ +/* Copyright 2026 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * PeerMetadata network and upstream network filters are used in one of ambient + * peer metadata discovery mechanims. The peer metadata discovery mechanism + * these filters are part of relies on peer reporting their own metadata in + * HBONE CONNECT request and response headers. + * + * The purpose of these filters is to extract this metadata from the request/ + * response headers and propagate it to the Istio filters reporting telemetry + * where this metadata will be used as labels. + * + * The filters in this folder are specifically concerned with extracting and + * propagating upstream peer metadata. The working setup includes a combination + * of several filters that together get the job done. + * + * A bit of background, here is a very simplified description of how Istio + * waypoint processes a request: + * + * 1. connect_terminate listener recieves an incoming HBONE connection; + * * it uwraps HBONE tunnel and extracts the data passed inside it; + * * it passes the data inside the HBONE tunnel to a main_internal listener + * that performs the next stage of processing; + * 2. main_internal listener is responsible for parsing the data as L7 data + * (HTTP/gRPC), applying configured L7 policies, picking the endpoint to + * route the request to and reports L7 stats + * * At this level we are processing the incoming request at L7 level and + * have access to things like status of the request and can report + * meaningful metrics; + * * To report in metrics where the request came from and where it went + * after we need to know the details of downstream and upstream peers - + * that's what we call peer metadata; + * * Once we've done with L7 processing of the request, we pass the request + * to the connect_originate (or inner_connect_originate in case of double + * HBONE) listener that will handle the next stage of processing; + * 3. connect_originate - is responsible for wrapping processed L7 traffic into + * an HBONE tunnel and sending it out + * * This stage of processing treats data as a stream of bytes without any + * knowledge of L7 protocol details; + * * It takes the upstream peer address as input an establishes an HBONE + * tunnel to the destination and sends the data via that tunnel. + * + * With that picture in mind, what we want to do is in connect_originate (or + * inner_connect_originate in case of double-HBONE) when we establish HBONE + * tunnel, we want to extract peer metadata from the CONNECT response and + * propagate it to the main_internal. + * + * To establish HBONE tunnel we rely on Envoy TCP Proxy filter, so we don't + * handle HTTP2 CONNECT responses or requests directly, instead we rely on the + * TCP Proxy filter to extract required information from the response and save + * it in the filter state. We then use the custom network filter to take filter + * state proved by TCP Proxy filter, encode it, and send it to main_internal + * *as data* before any actual response data. This is what the network filter + * defined here is responsible for. + * + * In main_internal we use a custom upstream network filter to extract and + * remove the metadata from the data stream and populate filter state that + * could be used by Istio telemetry filters. That's what the upstream network + * filter defined here is responsible for. + * + * Why do we do it this way? Generally in Envoy we use filter state and dynamic + * metadata to communicate additional information between filters. While it's + * possible to propagate filter state from downstream to upstream, i.e., we + * could set filter state in connect_terminate and propagate it to + * main_internal and then to connect_originate, it's not possible to propagate + * filter state from upstream to downstream, i.e., we cannot make filter state + * set in connect_originate available to main_internal directly. That's why we + * push that metadata with the data instead. + */ + +#include + +#include "envoy/network/filter.h" +#include "envoy/server/filter_config.h" +#include "extensions/common/metadata_object.h" +#include "source/common/common/logger.h" +#include "source/common/router/string_accessor_impl.h" +#include "source/common/singleton/const_singleton.h" +#include "source/common/stream_info/bool_accessor_impl.h" +#include "source/common/tcp_proxy/tcp_proxy.h" +#include "source/extensions/filters/common/expr/cel_state.h" +#include "source/extensions/filters/network/common/factory_base.h" +#include "source/extensions/filters/network/peer_metadata/config.pb.h" +#include "source/extensions/filters/network/peer_metadata/config.pb.validate.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PeerMetadata { + +namespace { + +using Config = ::envoy::extensions::network_filters::peer_metadata::Config; +using UpstreamConfig = ::envoy::extensions::network_filters::peer_metadata::UpstreamConfig; + +using CelState = ::Envoy::Extensions::Filters::Common::Expr::CelState; +using CelStatePrototype = ::Envoy::Extensions::Filters::Common::Expr::CelStatePrototype; +using CelStateType = ::Envoy::Extensions::Filters::Common::Expr::CelStateType; + +PACKED_STRUCT(struct PeerMetadataHeader { + uint32_t magic; + static const uint32_t magic_number = 0xabcd1234; + uint32_t data_size; +}); + +struct HeaderValues { + const Http::LowerCaseString Baggage{"baggage"}; +}; + +using Headers = ConstSingleton; + +enum class PeerMetadataState { + WaitingForData, + PassThrough, +}; + +std::string baggageValue(const Server::Configuration::ServerFactoryContext& context) { + const auto obj = + ::Istio::Common::convertStructToWorkloadMetadata(context.localInfo().node().metadata()); + return obj->baggage(); +} + +/** + * This is a regular network filter that will be installed in the + * connect_originate or inner_connect_originate filter chains. It will take + * baggage header information from filter state (we expect TCP Proxy to + * populate it), collect other details that are missing from the baggage, i.e. + * the upstream peer principle, encode those details into a sequence of bytes + * and will inject it dowstream. + */ +class Filter : public Network::Filter, Logger::Loggable { +public: + Filter(const Config& config, Server::Configuration::ServerFactoryContext& context) + : config_(config), baggage_(baggageValue(context)) {} + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance&, bool) override { + return Network::FilterStatus::Continue; + } + + Network::FilterStatus onNewConnection() override { + ENVOY_LOG(trace, "New connection from downstream"); + populateBaggage(); + return Network::FilterStatus::Continue; + } + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { + read_callbacks_ = &callbacks; + } + + // Network::WriteFilter + Network::FilterStatus onWrite(Buffer::Instance& buffer, bool) override { + ENVOY_LOG(trace, "Writing {} bytes to the downstream connection", buffer.length()); + switch (state_) { + case PeerMetadataState::WaitingForData: { + // If we are receiving data for downstream - there is no point in waiting + // for peer metadata anymore, if the upstream sent it, we'd have it by + // now. So we can check if the peer metadata is available or not, and if + // no peer metadata available, we can give up waiting for it. + std::optional peer_metadata = discoverPeerMetadata(); + if (peer_metadata) { + propagatePeerMetadata(*peer_metadata); + } else { + propagateNoPeerMetadata(); + } + state_ = PeerMetadataState::PassThrough; + break; + } + default: + break; + } + return Network::FilterStatus::Continue; + } + + void initializeWriteFilterCallbacks(Network::WriteFilterCallbacks& callbacks) override { + write_callbacks_ = &callbacks; + } + +private: + void populateBaggage() { + if (config_.baggage_key().empty()) { + ENVOY_LOG(trace, "Not populating baggage filter state because baggage key is not set"); + return; + } + + ENVOY_LOG(trace, "Populating baggage value {} in the filter state with key {}", baggage_, + config_.baggage_key()); + ASSERT(read_callbacks_); + read_callbacks_->connection().streamInfo().filterState()->setData( + config_.baggage_key(), std::make_shared(baggage_), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::FilterChain); + } + + // discoveryPeerMetadata is called to check if the baggage HTTP2 CONNECT + // response headers have been populated already in the filter state. + // + // NOTE: It's safe to call this function during any step of processing - it + // will not do anything if the filter is not in the right state. + std::optional discoverPeerMetadata() { + ENVOY_LOG(trace, "Trying to discovery peer metadata from filter state set by TCP Proxy"); + ASSERT(write_callbacks_); + + const Network::Connection& conn = write_callbacks_->connection(); + const StreamInfo::StreamInfo& stream = conn.streamInfo(); + const TcpProxy::TunnelResponseHeaders* state = + stream.filterState().getDataReadOnly( + TcpProxy::TunnelResponseHeaders::key()); + if (!state) { + ENVOY_LOG(trace, "TCP Proxy didn't set expected filter state"); + return std::nullopt; + } + + const Http::HeaderMap& headers = state->value(); + const auto baggage = headers.get(Headers::get().Baggage); + if (baggage.empty()) { + ENVOY_LOG( + trace, + "TCP Proxy saved response headers to the filter state, but there is no baggage header"); + return std::nullopt; + } + + ENVOY_LOG(trace, + "Successfully discovered peer metadata from the baggage header saved by TCP Proxy"); + + std::string identity{}; + const auto upstream = write_callbacks_->connection().streamInfo().upstreamInfo(); + if (upstream) { + const auto conn = upstream->upstreamSslConnection(); + if (conn) { + identity = absl::StrJoin(conn->uriSanPeerCertificate(), ","); + ENVOY_LOG(trace, "Discovered upstream peer identity to be {}", identity); + } + } + + std::unique_ptr<::Istio::Common::WorkloadMetadataObject> metadata = + ::Istio::Common::convertBaggageToWorkloadMetadata(baggage[0]->value().getStringView(), + identity); + + google::protobuf::Struct data = convertWorkloadMetadataToStruct(*metadata); + google::protobuf::Any wrapped; + wrapped.PackFrom(data); + return wrapped; + } + + void propagatePeerMetadata(const google::protobuf::Any& peer_metadata) { + ENVOY_LOG(trace, "Sending peer metadata downstream with the data stream"); + ASSERT(write_callbacks_); + + if (state_ != PeerMetadataState::WaitingForData) { + // It's only safe and correct to send the peer metadata downstream with + // the data if we haven't done that already, otherwise the downstream + // could be very confused by the data they received. + ENVOY_LOG(trace, "Filter has already sent the peer metadata downstream"); + return; + } + + std::string data = peer_metadata.SerializeAsString(); + PeerMetadataHeader header{PeerMetadataHeader::magic_number, static_cast(data.size())}; + + Buffer::OwnedImpl buffer{ + std::string_view(reinterpret_cast(&header), sizeof(header))}; + buffer.add(data); + write_callbacks_->injectWriteDataToFilterChain(buffer, false); + } + + void propagateNoPeerMetadata() { + ENVOY_LOG(trace, "Sending no peer metadata downstream with the data"); + ASSERT(write_callbacks_); + + PeerMetadataHeader header{PeerMetadataHeader::magic_number, 0}; + Buffer::OwnedImpl buffer{ + std::string_view(reinterpret_cast(&header), sizeof(header))}; + write_callbacks_->injectWriteDataToFilterChain(buffer, false); + } + + PeerMetadataState state_ = PeerMetadataState::WaitingForData; + Network::WriteFilterCallbacks* write_callbacks_{}; + Network::ReadFilterCallbacks* read_callbacks_{}; + const Config& config_; + std::string baggage_; +}; + +/** + * This is an upstream network filter complementing the filter above. It will + * be installed in all the service clusters that may use HBONE (or double + * HBONE) to communicate with the upstream peers and it will parse and remove + * the data injected by the filter above. The parsed peer metadata details will + * be saved in the filter state. + * + * NOTE: This filter has built-in safety checks that would prevent it from + * trying to interpret the actual connection data as peer metadata injected + * by the filter above. However, those checks are rather shallow and rely on a + * bunch of implicit assumptions (i.e., the magic number does not match + * accidentally, the upstream host actually sends back some data that we can + * check, etc). What I'm trying to say is that in correct setup we don't need + * to rely on those checks for correctness and if it's not the case, then we + * definitely have a bug. + */ +class UpstreamFilter : public Network::ReadFilter, Logger::Loggable { +public: + UpstreamFilter() {} + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& buffer, bool end_stream) override { + ENVOY_LOG(trace, "Read {} bytes from the upstream connection", buffer.length()); + + switch (state_) { + case PeerMetadataState::WaitingForData: + if (!isUpstreamHBONE()) { + state_ = PeerMetadataState::PassThrough; + break; + } + if (consumePeerMetadata(buffer, end_stream)) { + state_ = PeerMetadataState::PassThrough; + } else { + // If we got here it means that we are waiting for more data to arrive. + // NOTE: if error happened, we will not get here, consumePeerMetadata + // will just return true and we will enter PassThrough state. + return Network::FilterStatus::StopIteration; + } + break; + default: + break; + } + + return Network::FilterStatus::Continue; + } + + Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; } + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { + callbacks_ = &callbacks; + } + +private: + // TODO: This is a rather shallow check - it only verifies that the upstream is an internal + // listener and therefore could have peer metadata filter that will send peer metadata with + // the data stream. + // + // We can be more explicit than that and check the name of the internal listener to only + // trigger the logic when we talk to connect_originate or inner_connect_originate listeners. + // A more clean approach would be to add endpoint metadata that will let this filter know + // that it should not trigger for the connection (or should trigger on the connection). + // + // Another potential benefit here is that we can trigger baggage-based peer metadata + // discovery only for double-HBONE connections, if we let this filter skip all regular + // endpoints that don't communicate with the E/W gateway. + // + // We could also consider dropping this check alltogether, because we only need this filter + // in waypoints and all waypoint clusters contain either HBONE or double-HBONE endpoints. + bool isUpstreamHBONE() const { + ASSERT(callbacks_); + + const auto upstream = callbacks_->connection().streamInfo().upstreamInfo(); + if (!upstream) { + ENVOY_LOG(trace, "No upstream information, cannot confirm that upstream uses HBONE"); + return false; + } + + const auto host = upstream->upstreamHost(); + if (!host) { + ENVOY_LOG(trace, "No upstream host, cannot confirm that upstream host uses HBONE"); + return false; + } + + if (host->address()->type() != Network::Address::Type::EnvoyInternal) { + ENVOY_LOG(trace, + "Upstream host is not an internal listener - upstream host does not use HBONE"); + return false; + } + + ENVOY_LOG(trace, + "Upstream host is an internal listener - concluding that upstream host uses HBONE"); + return true; + } + + bool consumePeerMetadata(Buffer::Instance& buffer, bool end_stream) { + ENVOY_LOG(trace, "Trying to consume peer metadata from the data stream"); + using namespace ::Istio::Common; + + ASSERT(callbacks_); + + if (state_ != PeerMetadataState::WaitingForData) { + ENVOY_LOG(trace, "The filter already consumed peer metadata from the data stream"); + return true; + } + + if (buffer.length() < sizeof(PeerMetadataHeader)) { + if (end_stream) { + ENVOY_LOG(trace, "Not enough data in the data stream for peer metadata header and no more " + "data is coming"); + populateNoPeerMetadata(); + return true; + } + ENVOY_LOG( + trace, + "Not enough data in the data stream for peer metadata header, waiting for more data"); + return false; + } + + PeerMetadataHeader header; + buffer.copyOut(0, sizeof(PeerMetadataHeader), &header); + + if (header.magic != PeerMetadataHeader::magic_number) { + ENVOY_LOG(trace, "Magic number in the peer metadata header didn't match expected value"); + populateNoPeerMetadata(); + return true; + } + + if (header.data_size == 0) { + ENVOY_LOG(trace, "Peer metadata is empty"); + populateNoPeerMetadata(); + buffer.drain(sizeof(PeerMetadataHeader)); + return true; + } + + const size_t peer_metadata_size = sizeof(PeerMetadataHeader) + header.data_size; + + if (buffer.length() < peer_metadata_size) { + if (end_stream) { + ENVOY_LOG( + trace, + "Not enough data in the data stream for peer metadata and no more data is coming"); + populateNoPeerMetadata(); + return true; + } + ENVOY_LOG(trace, + "Not enough data in the data stream for peer metadata, waiting for more data"); + return false; + } + + std::string_view data{static_cast(buffer.linearize(peer_metadata_size)), + peer_metadata_size}; + data = data.substr(sizeof(PeerMetadataHeader)); + google::protobuf::Any any; + if (!any.ParseFromArray(data.data(), data.size())) { + ENVOY_LOG(trace, "Failed to parse peer metadata proto from the data stream"); + populateNoPeerMetadata(); + return true; + } + + google::protobuf::Struct peer_metadata; + if (!any.UnpackTo(&peer_metadata)) { + ENVOY_LOG(trace, "Failed to unpack peer metadata struct"); + populateNoPeerMetadata(); + return true; + } + + std::unique_ptr workload = + convertStructToWorkloadMetadata(peer_metadata); + populatePeerMetadata(*workload); + buffer.drain(peer_metadata_size); + ENVOY_LOG(trace, "Successfully consumed peer metadata from the data stream"); + return true; + } + + static const CelStatePrototype& peerInfoPrototype() { + static const CelStatePrototype* const prototype = new CelStatePrototype( + true, CelStateType::Protobuf, "type.googleapis.com/google.protobuf.Struct", + StreamInfo::FilterState::LifeSpan::Connection); + return *prototype; + } + + void populatePeerMetadata(const ::Istio::Common::WorkloadMetadataObject& peer) { + ENVOY_LOG(trace, "Populating peer metadata in the upstream filter state"); + ASSERT(callbacks_); + + auto proto = ::Istio::Common::convertWorkloadMetadataToStruct(peer); + auto cel = std::make_shared(peerInfoPrototype()); + cel->setValue(std::string_view(proto.SerializeAsString())); + callbacks_->connection().streamInfo().filterState()->setData( + ::Istio::Common::UpstreamPeer, std::move(cel), StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + } + + void populateNoPeerMetadata() { + ENVOY_LOG(trace, "Populating no peer metadata in the upstream filter state"); + ASSERT(callbacks_); + + callbacks_->connection().streamInfo().filterState()->setData( + ::Istio::Common::NoPeer, std::make_shared(true), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + } + + PeerMetadataState state_ = PeerMetadataState::WaitingForData; + Network::ReadFilterCallbacks* callbacks_{}; +}; + +/** + * PeerMetadata network filter factory. + * + * This filter is responsible for collecting peer metadata from filter state + * and other sources, encoding it and passing it downstream before the actual + * data. + */ +class ConfigFactory : public Common::ExceptionFreeFactoryBase { +public: + ConfigFactory() + : Common::ExceptionFreeFactoryBase("envoy.filters.network.peer_metadata", + /*is_termnial*/ false) {} + +private: + absl::StatusOr + createFilterFactoryFromProtoTyped(const Config& config, + Server::Configuration::FactoryContext& context) override { + return [config, &context](Network::FilterManager& filter_manager) -> void { + filter_manager.addFilter(std::make_shared(config, context.serverFactoryContext())); + }; + } +}; + +/** + * PeerMetadata upstream network filter factory. + * + * This filter is responsible for detecting the peer metadata passed in the + * data stream, parsing it, populating filter state based on that and finally + * removing it from the data stream, so that downstream filters can process + * the data as usual. + */ +class UpstreamConfigFactory + : public Server::Configuration::NamedUpstreamNetworkFilterConfigFactory { +public: + Network::FilterFactoryCb + createFilterFactoryFromProto(const Protobuf::Message& config, + Server::Configuration::UpstreamFactoryContext&) override { + return createFilterFactory(dynamic_cast(config)); + } + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } + + std::string name() const override { return "envoy.filters.network.upstream.peer_metadata"; } + + bool isTerminalFilterByProto(const Protobuf::Message&, + Server::Configuration::ServerFactoryContext&) override { + // This filter must be last filter in the upstream filter chain, so that + // it'd be the first filter to see and process the data coming back, + // because it has to remove the preamble set by the network filter. + return true; + } + +private: + Network::FilterFactoryCb createFilterFactory(const UpstreamConfig&) { + return [](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter(std::make_shared()); + }; + } +}; + +REGISTER_FACTORY(ConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory); +REGISTER_FACTORY(UpstreamConfigFactory, + Server::Configuration::NamedUpstreamNetworkFilterConfigFactory); + +} // namespace + +} // namespace PeerMetadata +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy