diff --git a/hypertrace-trace-enricher/enriched-span-constants/src/main/java/org/hypertrace/traceenricher/enrichedspan/constants/EnrichedSpanConstants.java b/hypertrace-trace-enricher/enriched-span-constants/src/main/java/org/hypertrace/traceenricher/enrichedspan/constants/EnrichedSpanConstants.java index 58fcddbe0..5a33d8c03 100644 --- a/hypertrace-trace-enricher/enriched-span-constants/src/main/java/org/hypertrace/traceenricher/enrichedspan/constants/EnrichedSpanConstants.java +++ b/hypertrace-trace-enricher/enriched-span-constants/src/main/java/org/hypertrace/traceenricher/enrichedspan/constants/EnrichedSpanConstants.java @@ -18,6 +18,8 @@ public class EnrichedSpanConstants { public static final String GRPC_REQUEST_URL = "grpc.request.url"; public static final String GRPC_REQUEST_ENDPOINT = "grpc.request.endpoint"; public static final String DROP_TRACE_ATTRIBUTE = "drop.trace"; + public static final String PEER_SERVICE_NAME = "PEER_SERVICE_NAME"; + public static final String PEER_SERVICE_ID = "PEER_SERVICE_ID"; /** * Returns the constant value for the given Enum. diff --git a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml index ca65a46e4..ed899891d 100644 --- a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml +++ b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml @@ -85,3 +85,17 @@ data: {{- if hasKey .Values.rawSpansGrouperConfig "traceEmitPunctuatorFrequency" }} trace.emit.punctuator.frequency = {{ .Values.rawSpansGrouperConfig.traceEmitPunctuatorFrequency }} {{- end }} + + {{- if hasKey .Values.rawSpansGrouperConfig "peerCorrelation" }} + peer.correlation = { + {{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "enabledCustomers" }} + enabled.customers = {{ .Values.rawSpansGrouperConfig.peerCorrelation.enabledCustomers | toJson }} + {{- end }} + {{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "enabledAgents" }} + enabled.agents = {{ .Values.rawSpansGrouperConfig.peerCorrelation.enabledAgents | toJson }} + {{- end }} + {{- if hasKey .Values.rawSpansGrouperConfig.peerCorrelation "agentTypeAttribute" }} + agent.type.attribute = {{ .Values.rawSpansGrouperConfig.peerCorrelation.agentTypeAttribute }} + {{- end }} + } + {{- end }} diff --git a/raw-spans-grouper/helm/values.yaml b/raw-spans-grouper/helm/values.yaml index 139c0855c..05ec7d95e 100644 --- a/raw-spans-grouper/helm/values.yaml +++ b/raw-spans-grouper/helm/values.yaml @@ -133,6 +133,9 @@ rawSpansGrouperConfig: groupPartitionerConfigServiceHost: config-service groupPartitionerConfigServicePort: 50104 traceEmitPunctuatorFrequency: 5s + peerCorrelation: + enabledCustomers: [] + enabledAgents: [] span: groupby: diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 86fbca3b9..de03c6d8c 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -33,6 +33,8 @@ dependencies { because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637") } implementation(project(":span-normalizer:span-normalizer-api")) + implementation(project(":semantic-convention-utils")) + implementation(project(":hypertrace-trace-enricher:enriched-span-constants")) implementation(libs.hypertrace.data.model) implementation(libs.hypertrace.serviceFramework.framework) implementation(libs.hypertrace.serviceFramework.metrics) diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java index 7c91a30d7..0ff299331 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java @@ -8,11 +8,18 @@ public class RawSpanGrouperConstants { public static final String RAW_SPANS_GROUPER_JOB_CONFIG = "raw-spans-grouper-job-config"; public static final String SPAN_STATE_STORE_NAME = "span-data-store"; public static final String TRACE_STATE_STORE = "trace-state-store"; + public static final String PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE = + "peer-identity-to-span-metadata-store"; public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; public static final String SPANS_PER_TRACE_METRIC = "spans_per_trace"; public static final String TRACE_CREATION_TIME = "trace.creation.time"; public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY = "dataflow.metriccollection.sampling.percent"; + public static final String PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG = + "peer.correlation.agent.type.attribute"; + public static final String PEER_CORRELATION_ENABLED_CUSTOMERS = + "peer.correlation.enabled.customers"; + public static final String PEER_CORRELATION_ENABLED_AGENTS = "peer.correlation.enabled.agents"; public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count"; public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count"; public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans"; diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java index 597ef2dba..05adcee51 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java @@ -3,6 +3,7 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INPUT_TOPIC_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME; @@ -31,7 +32,9 @@ import org.hypertrace.core.kafkastreams.framework.partitioner.GroupPartitionerBuilder; import org.hypertrace.core.kafkastreams.framework.partitioner.KeyHashPartitioner; import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.hypertrace.core.spannormalizer.PeerIdentity; import org.hypertrace.core.spannormalizer.SpanIdentity; +import org.hypertrace.core.spannormalizer.SpanMetadata; import org.hypertrace.core.spannormalizer.TraceIdentity; import org.hypertrace.core.spannormalizer.TraceState; import org.slf4j.Logger; @@ -86,6 +89,14 @@ StreamsBuilder buildTopologyWithClock( Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde) .withCachingEnabled(); + StoreBuilder> + peerIdentityToSpanMetadataStateStoreBuilder = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE), + keySerde, + valueSerde) + .withCachingEnabled(); + StoreBuilder>> traceEmitPunctuatorStoreBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME), @@ -95,6 +106,7 @@ StreamsBuilder buildTopologyWithClock( streamsBuilder.addStateStore(spanStoreBuilder); streamsBuilder.addStateStore(traceStateStoreBuilder); + streamsBuilder.addStateStore(peerIdentityToSpanMetadataStateStoreBuilder); streamsBuilder.addStateStore(traceEmitPunctuatorStoreBuilder); StreamPartitioner groupPartitioner = @@ -116,7 +128,8 @@ StreamsBuilder buildTopologyWithClock( Named.as(RawSpansProcessor.class.getSimpleName()), SPAN_STATE_STORE_NAME, TRACE_STATE_STORE, - TRACE_EMIT_PUNCTUATOR_STORE_NAME) + TRACE_EMIT_PUNCTUATOR_STORE_NAME, + PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE) .to(outputTopic, outputTopicProducer); return streamsBuilder; diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 28bedefaa..ab561bc8a 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -7,6 +7,10 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_AGENTS; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_CUSTOMERS; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; @@ -15,6 +19,7 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER; +import static org.hypertrace.traceenricher.enrichedspan.constants.EnrichedSpanConstants.PEER_SERVICE_NAME; import com.typesafe.config.Config; import io.micrometer.core.instrument.Counter; @@ -23,9 +28,12 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -36,14 +44,26 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; +import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.datamodel.Timestamps; import org.hypertrace.core.datamodel.shared.HexUtils; +import org.hypertrace.core.datamodel.shared.SpanAttributeUtils; +import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator; +import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder; import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig; +import org.hypertrace.core.rawspansgrouper.utils.TraceLatencyMeter; +import org.hypertrace.core.rawspansgrouper.validator.PeerIdentityValidator; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; +import org.hypertrace.core.spannormalizer.IpIdentity; +import org.hypertrace.core.spannormalizer.PeerIdentity; import org.hypertrace.core.spannormalizer.SpanIdentity; +import org.hypertrace.core.spannormalizer.SpanMetadata; import org.hypertrace.core.spannormalizer.TraceIdentity; import org.hypertrace.core.spannormalizer.TraceState; +import org.hypertrace.semantic.convention.utils.http.HttpSemanticConventionUtils; +import org.hypertrace.semantic.convention.utils.span.SpanSemanticConventionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +80,7 @@ public class RawSpansProcessor private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class); private static final String PROCESSING_LATENCY_TIMER = "hypertrace.rawspansgrouper.processing.latency"; + private static final String ALL = "*"; private static final ConcurrentMap tenantToSpansGroupingTimer = new ConcurrentHashMap<>(); // counter for number of spans dropped per tenant @@ -69,15 +90,21 @@ public class RawSpansProcessor // counter for number of truncated traces per tenant private static final ConcurrentMap truncatedTracesCounter = new ConcurrentHashMap<>(); + private ProcessorContext context; private final Clock clock; private KeyValueStore spanStore; private KeyValueStore traceStateStore; + private KeyValueStore peerIdentityToSpanMetadataStateStore; private long groupingWindowTimeoutMs; private double dataflowSamplingPercent = -1; private static final Map maxSpanCountMap = new HashMap<>(); private long defaultMaxSpanCountLimit = Long.MAX_VALUE; private TraceEmitPunctuator traceEmitPunctuator; private Cancellable traceEmitTasksPunctuatorCancellable; + private String peerCorrelationAgentTypeAttribute; + private List peerCorrelationEnabledCustomers; + private List peerCorrelationEnabledAgents; + private TraceLatencyMeter traceLatencyMeter; public RawSpansProcessor(Clock clock) { this.clock = clock; @@ -85,9 +112,24 @@ public RawSpansProcessor(Clock clock) { @Override public void init(ProcessorContext context) { + this.context = context; this.spanStore = context.getStateStore(SPAN_STATE_STORE_NAME); this.traceStateStore = context.getStateStore(TRACE_STATE_STORE); + this.peerIdentityToSpanMetadataStateStore = + context.getStateStore(PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE); Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG)); + this.peerCorrelationAgentTypeAttribute = + jobConfig.hasPath(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG) + ? jobConfig.getString(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG) + : null; + this.peerCorrelationEnabledCustomers = + jobConfig.hasPath(PEER_CORRELATION_ENABLED_CUSTOMERS) + ? jobConfig.getStringList(PEER_CORRELATION_ENABLED_CUSTOMERS) + : Collections.emptyList(); + this.peerCorrelationEnabledAgents = + jobConfig.hasPath(PEER_CORRELATION_ENABLED_AGENTS) + ? jobConfig.getStringList(PEER_CORRELATION_ENABLED_AGENTS) + : Collections.emptyList(); this.groupingWindowTimeoutMs = jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000; @@ -96,15 +138,13 @@ public void init(ProcessorContext context) { && jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY) <= 100) { this.dataflowSamplingPercent = jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY); } + this.traceLatencyMeter = new TraceLatencyMeter(dataflowSamplingPercent); if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) { Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT); subConfig .entrySet() - .forEach( - (entry) -> { - maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey())); - }); + .forEach(entry -> maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()))); } if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { @@ -151,15 +191,23 @@ public void process(Record record) { TraceIdentity key = record.key(); RawSpan value = record.value(); TraceState traceState = traceStateStore.get(key); - boolean firstEntry = (traceState == null); if (shouldDropSpan(key, traceState)) { return; } + Event event = value.getEvent(); + // spans whose trace is not created by ByPass flow, their trace will be created in below + // function call. Those spans' trace will not be created by TraceEmitPunctuator + if (isPeerServiceNameIdentificationRequired(event)) { + processSpanForPeerServiceNameIdentification(key, value, traceState, currentTimeMs); + return; + } + + boolean firstEntry = (traceState == null); String tenantId = key.getTenantId(); ByteBuffer traceId = value.getTraceId(); - ByteBuffer spanId = value.getEvent().getEventId(); + ByteBuffer spanId = event.getEventId(); spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value); if (firstEntry) { @@ -196,7 +244,104 @@ public void process(Record record) { .record(Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); // no need to do context.forward. the punctuator will emit the trace once it's eligible to be // emitted - return; + } + + private void processSpanForPeerServiceNameIdentification( + TraceIdentity key, RawSpan value, TraceState traceState, long currentTimeMs) { + Event event = value.getEvent(); + String tenantId = key.getTenantId(); + ByteBuffer traceId = value.getTraceId(); + boolean firstEntry = (traceState == null); + Optional maybeEnvironment = HttpSemanticConventionUtils.getEnvironmentForSpan(event); + if (SpanSemanticConventionUtils.isClientSpanForOCFormat( + event.getAttributes().getAttributeMap())) { + handleClientSpan(tenantId, event, maybeEnvironment.orElse(null)); + } else { + handleServerSpan(tenantId, event, maybeEnvironment.orElse(null)); + } + + // create structured trace and forward + Timestamps timestamps = + traceLatencyMeter.trackEndToEndLatencyTimestamps( + currentTimeMs, firstEntry ? currentTimeMs : traceState.getTraceStartTimestamp()); + StructuredTrace trace = + StructuredTraceBuilder.buildStructuredTraceFromRawSpans( + List.of(value), traceId, tenantId, timestamps); + context.forward(new Record<>(key, trace, currentTimeMs), OUTPUT_TOPIC_PRODUCER); + } + + // put the peer service identity and corresponding service name in state store + private void handleClientSpan(String tenantId, Event event, String environment) { + Optional maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event); + Optional maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event); + Optional maybePeerPort = HttpSemanticConventionUtils.getPeerPort(event); + String serviceName = event.getServiceName(); + PeerIdentity peerIdentity = + PeerIdentity.newBuilder() + .setIpIdentity( + IpIdentity.newBuilder() + .setTenantId(tenantId) + .setEnvironment(environment) + .setHostAddr(maybeHostAddr.orElse(null)) + .setPeerAddr(maybePeerAddr.orElse(null)) + .setPeerPort(maybePeerPort.orElse(null)) + .build()) + .build(); + if (PeerIdentityValidator.isValid(peerIdentity)) { + this.peerIdentityToSpanMetadataStateStore.put( + peerIdentity, + SpanMetadata.newBuilder() + .setServiceName(serviceName) + .setEventId(event.getEventId()) + .build()); + } + } + + // get the service name for that peer service identity and correlate the current span + private void handleServerSpan(String tenantId, Event event, String environment) { + Optional maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event); + Optional maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event); + Optional maybeHostPort = HttpSemanticConventionUtils.getHostPort(event); + PeerIdentity peerIdentity = + PeerIdentity.newBuilder() + .setIpIdentity( + IpIdentity.newBuilder() + .setTenantId(tenantId) + .setEnvironment(environment) + .setHostAddr(maybePeerAddr.orElse(null)) + .setPeerAddr(maybeHostAddr.orElse(null)) + .setPeerPort(maybeHostPort.orElse(null)) + .build()) + .build(); + if (PeerIdentityValidator.isValid(peerIdentity)) { + SpanMetadata spanMetadata = this.peerIdentityToSpanMetadataStateStore.get(peerIdentity); + if (Objects.nonNull(spanMetadata)) { + logger.debug( + "Adding {} as: {} from spanId: {} in spanId: {} with service name: {}", + PEER_SERVICE_NAME, + spanMetadata.getServiceName(), + HexUtils.getHex(spanMetadata.getEventId()), + HexUtils.getHex(event.getEventId()), + event.getServiceName()); + + event + .getEnrichedAttributes() + .getAttributeMap() + .put(PEER_SERVICE_NAME, AttributeValueCreator.create(spanMetadata.getServiceName())); + } + } + } + + private boolean isPeerServiceNameIdentificationRequired(Event event) { + if (!this.peerCorrelationEnabledCustomers.contains(event.getCustomerId()) + && !this.peerCorrelationEnabledCustomers.contains(ALL)) { + return false; + } + + String agentType = + SpanAttributeUtils.getStringAttributeWithDefault( + event, this.peerCorrelationAgentTypeAttribute, null); + return Objects.nonNull(agentType) && this.peerCorrelationEnabledAgents.contains(agentType); } private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java index 65bf563ba..8a5816c5e 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java @@ -1,7 +1,6 @@ package org.hypertrace.core.rawspansgrouper; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPANS_PER_TRACE_METRIC; -import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_CREATION_TIME; import com.google.common.util.concurrent.RateLimiter; import io.micrometer.core.instrument.Counter; @@ -18,13 +17,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; -import org.hypertrace.core.datamodel.TimestampRecord; import org.hypertrace.core.datamodel.Timestamps; import org.hypertrace.core.datamodel.shared.DataflowMetricUtils; import org.hypertrace.core.datamodel.shared.HexUtils; @@ -34,6 +31,7 @@ import org.hypertrace.core.kafkastreams.framework.punctuators.action.CompletedTaskResult; import org.hypertrace.core.kafkastreams.framework.punctuators.action.RescheduleTaskResult; import org.hypertrace.core.kafkastreams.framework.punctuators.action.TaskResult; +import org.hypertrace.core.rawspansgrouper.utils.TraceLatencyMeter; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.hypertrace.core.spannormalizer.SpanIdentity; import org.hypertrace.core.spannormalizer.TraceIdentity; @@ -68,12 +66,12 @@ class TraceEmitPunctuator extends AbstractThrottledPunctuator { new ConcurrentHashMap<>(); private static final CompletedTaskResult COMPLETED_TASK_RESULT = new CompletedTaskResult(); - private final double dataflowSamplingPercent; private final ProcessorContext context; private final KeyValueStore spanStore; private final KeyValueStore traceStateStore; private final String outputTopicProducer; private final long groupingWindowTimeoutMs; + private final TraceLatencyMeter traceLatencyMeter; TraceEmitPunctuator( ThrottledPunctuatorConfig throttledPunctuatorConfig, @@ -90,7 +88,7 @@ class TraceEmitPunctuator extends AbstractThrottledPunctuator { this.traceStateStore = traceStateStore; this.outputTopicProducer = outputTopicProducer; this.groupingWindowTimeoutMs = groupingWindowTimeoutMs; - this.dataflowSamplingPercent = dataflowSamplingPercent; + this.traceLatencyMeter = new TraceLatencyMeter(dataflowSamplingPercent); } protected TaskResult executeTask(long punctuateTimestamp, TraceIdentity key) { @@ -153,7 +151,7 @@ private void emitTrace(TraceIdentity key, TraceState traceState) { recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId))); Timestamps timestamps = - trackEndToEndLatencyTimestamps( + this.traceLatencyMeter.trackEndToEndLatencyTimestamps( System.currentTimeMillis(), traceState.getTraceStartTimestamp()); StructuredTrace trace = StructuredTraceBuilder.buildStructuredTraceFromRawSpans( @@ -197,22 +195,6 @@ private void emitTrace(TraceIdentity key, TraceState traceState) { outputTopicProducer); } - private Timestamps trackEndToEndLatencyTimestamps( - long currentTimestamp, long firstSpanTimestamp) { - Timestamps timestamps = null; - if (!(Math.random() * 100 <= dataflowSamplingPercent)) { - spansGrouperArrivalLagTimer.record( - currentTimestamp - firstSpanTimestamp, TimeUnit.MILLISECONDS); - Map records = new HashMap<>(); - records.put( - DataflowMetricUtils.SPAN_ARRIVAL_TIME, - new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, firstSpanTimestamp)); - records.put(TRACE_CREATION_TIME, new TimestampRecord(TRACE_CREATION_TIME, currentTimestamp)); - timestamps = new Timestamps(records); - } - return timestamps; - } - private void recordSpansPerTrace(double count, Iterable tags) { DistributionSummary summary = DistributionSummary.builder(SPANS_PER_TRACE_METRIC) diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeter.java new file mode 100644 index 000000000..2e102167e --- /dev/null +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeter.java @@ -0,0 +1,37 @@ +package org.hypertrace.core.rawspansgrouper.utils; + +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_CREATION_TIME; + +import io.micrometer.core.instrument.Timer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.hypertrace.core.datamodel.TimestampRecord; +import org.hypertrace.core.datamodel.Timestamps; +import org.hypertrace.core.datamodel.shared.DataflowMetricUtils; +import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; + +public class TraceLatencyMeter { + private static final Timer spansGrouperArrivalLagTimer = + PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>()); + private final double dataflowSamplingPercent; + + public TraceLatencyMeter(double dataflowSamplingPercent) { + this.dataflowSamplingPercent = dataflowSamplingPercent; + } + + public Timestamps trackEndToEndLatencyTimestamps(long currentTimestamp, long firstSpanTimestamp) { + Timestamps timestamps = null; + if (!(Math.random() * 100 <= dataflowSamplingPercent)) { + spansGrouperArrivalLagTimer.record( + currentTimestamp - firstSpanTimestamp, TimeUnit.MILLISECONDS); + Map records = new HashMap<>(); + records.put( + DataflowMetricUtils.SPAN_ARRIVAL_TIME, + new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, firstSpanTimestamp)); + records.put(TRACE_CREATION_TIME, new TimestampRecord(TRACE_CREATION_TIME, currentTimestamp)); + timestamps = new Timestamps(records); + } + return timestamps; + } +} diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/IpIdentityValidator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/IpIdentityValidator.java new file mode 100644 index 000000000..09a210979 --- /dev/null +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/IpIdentityValidator.java @@ -0,0 +1,15 @@ +package org.hypertrace.core.rawspansgrouper.validator; + +import java.util.Objects; +import org.hypertrace.core.spannormalizer.IpIdentity; + +public class IpIdentityValidator { + public static boolean isValid(IpIdentity ipIdentity) { + if (Objects.isNull(ipIdentity.getEnvironment()) || Objects.isNull(ipIdentity.getTenantId())) { + return false; + } + + return Objects.nonNull(ipIdentity.getHostAddr()) + || (Objects.nonNull(ipIdentity.getPeerAddr()) && Objects.nonNull(ipIdentity.getPeerPort())); + } +} diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/PeerIdentityValidator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/PeerIdentityValidator.java new file mode 100644 index 000000000..ce64f1aff --- /dev/null +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/validator/PeerIdentityValidator.java @@ -0,0 +1,16 @@ +package org.hypertrace.core.rawspansgrouper.validator; + +import java.util.Objects; +import org.hypertrace.core.spannormalizer.IpIdentity; +import org.hypertrace.core.spannormalizer.PeerIdentity; + +public class PeerIdentityValidator { + + public static boolean isValid(PeerIdentity peerIdentity) { + final IpIdentity ipIdentity = peerIdentity.getIpIdentity(); + if (Objects.nonNull(ipIdentity)) { + return IpIdentityValidator.isValid(ipIdentity); + } + return true; + } +} diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf index 1849d5e97..632690f41 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf @@ -65,4 +65,9 @@ dataflow.metriccollection.sampling.percent = 10.0 trace.emit.punctuator { frequency = 5s frequency = ${?TRACE_EMIT_PUNCTUATOR_FREQUENCY} -} \ No newline at end of file +} + +peer.correlation = { + enabled.customers = [] + enabled.agents = [] +} diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index b28a92b48..53ec27027 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -1,5 +1,6 @@ package org.hypertrace.core.rawspansgrouper; +import static org.hypertrace.traceenricher.enrichedspan.constants.EnrichedSpanConstants.PEER_SERVICE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -12,6 +13,7 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.Clock; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -24,6 +26,8 @@ import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.test.TestRecord; +import org.hypertrace.core.datamodel.AttributeValue; +import org.hypertrace.core.datamodel.Attributes; import org.hypertrace.core.datamodel.Event; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.datamodel.StructuredTrace; @@ -303,14 +307,184 @@ private long advanceAndSyncClockMock(long messageTime, Clock clock, long advance return finalMessageTime; } + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") + void testMirroringSpansGrouping(@TempDir Path tempDir) { + File file = tempDir.resolve("state").toFile(); + + RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/raw-spans-grouper/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + + Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TestInputTopic inputTopic = + td.createInputTopic( + config.getString(RawSpanGrouperConstants.INPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.serializer(), + defaultValueSerde.serializer()); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + String hostAddr1 = "1.2.3.4"; + String hostAddr2 = "1.2.3.5"; + String hostAddr3 = "1.2.3.6"; + String hostPort2 = "5000"; + String hostPort3 = "6000"; + String service1 = "service1"; + String service2 = "service2"; + String service3 = "service3"; + + String tenantId = "tenant1"; + RawSpan span1 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) + .setCustomerId(tenantId) + .setEvent( + createMirroringEvent( + "event-1", tenantId, service1, "client", hostAddr1, "1234", hostAddr2, + hostPort2)) + .build(); + RawSpan span2 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) + .setCustomerId(tenantId) + .setEvent( + createMirroringEvent( + "event-2", tenantId, service3, "server", hostAddr3, hostPort3, hostAddr2, + "5678")) + .build(); + RawSpan span3 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) + .setCustomerId(tenantId) + .setEvent( + createMirroringEvent( + "event-3", tenantId, service2, "server", hostAddr2, hostPort2, hostAddr1, + "1234")) + .build(); + RawSpan span4 = + RawSpan.newBuilder() + .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) + .setCustomerId(tenantId) + .setEvent( + createMirroringEvent( + "event-4", tenantId, service2, "client", hostAddr2, "5678", hostAddr3, + hostPort3)) + .build(); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1); + StructuredTrace trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals(span1.getEvent(), trace.getEventList().get(0)); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span2); + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals(span2.getEvent(), trace.getEventList().get(0)); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span3); + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + Event event = span3.getEvent(); + event.setEnrichedAttributes( + Attributes.newBuilder() + .setAttributeMap(Map.of(PEER_SERVICE_NAME, createAttribute(service1))) + .build()); + assertEquals(event, trace.getEventList().get(0)); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-4"), span4); + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals(span4.getEvent(), trace.getEventList().get(0)); + + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span2); + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + event = span2.getEvent(); + event.setEnrichedAttributes( + Attributes.newBuilder() + .setAttributeMap(Map.of(PEER_SERVICE_NAME, createAttribute(service2))) + .build()); + assertEquals(event, trace.getEventList().get(0)); + } + private Event createEvent(String eventId, String tenantId) { return Event.newBuilder() .setCustomerId(tenantId) .setEventId(ByteBuffer.wrap(eventId.getBytes())) + .setAttributes(Attributes.newBuilder().setAttributeMap(Collections.emptyMap()).build()) + .setEnrichedAttributes( + Attributes.newBuilder().setAttributeMap(Collections.emptyMap()).build()) + .setStartTimeMillis(System.currentTimeMillis()) + .build(); + } + + private Event createMirroringEvent( + String eventId, + String tenantId, + String service, + String spanKind, + String hostAddr, + String hostPort, + String peerAddr, + String peerPort) { + return Event.newBuilder() + .setCustomerId(tenantId) + .setServiceName(service) + .setEventId(ByteBuffer.wrap(eventId.getBytes())) .setStartTimeMillis(System.currentTimeMillis()) + .setEnrichedAttributes( + Attributes.newBuilder().setAttributeMap(Collections.emptyMap()).build()) + .setAttributes( + Attributes.newBuilder() + .setAttributeMap( + Map.of( + "agent.type", + createAttribute("mirror"), + "deployment.environment", + createAttribute("environment"), + "span.kind", + createAttribute(spanKind), + "net.sock.host.addr", + createAttribute(hostAddr), + "net.sock.host.port", + createAttribute(hostPort), + "net.sock.peer.addr", + createAttribute(peerAddr), + "net.sock.peer.port", + createAttribute(peerPort))) + .build()) .build(); } + private AttributeValue createAttribute(String value) { + return AttributeValue.newBuilder().setValue(value).build(); + } + private TraceIdentity createTraceIdentity(String tenantId, String traceId) { return TraceIdentity.newBuilder() .setTenantId(tenantId) diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeterTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeterTest.java new file mode 100644 index 000000000..0405ffad7 --- /dev/null +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/utils/TraceLatencyMeterTest.java @@ -0,0 +1,31 @@ +package org.hypertrace.core.rawspansgrouper.utils; + +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_CREATION_TIME; + +import java.util.Map; +import org.hypertrace.core.datamodel.TimestampRecord; +import org.hypertrace.core.datamodel.Timestamps; +import org.hypertrace.core.datamodel.shared.DataflowMetricUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TraceLatencyMeterTest { + + @Test + void testTrackEndToEndLatencyTimestamps() { + TraceLatencyMeter traceLatencyMeter = new TraceLatencyMeter(100); + Timestamps timestamps = traceLatencyMeter.trackEndToEndLatencyTimestamps(123L, 123L); + Assertions.assertNull(timestamps); + + traceLatencyMeter = new TraceLatencyMeter(0); + timestamps = traceLatencyMeter.trackEndToEndLatencyTimestamps(150L, 100L); + Assertions.assertEquals( + new Timestamps( + Map.of( + DataflowMetricUtils.SPAN_ARRIVAL_TIME, + new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, 100L), + TRACE_CREATION_TIME, + new TimestampRecord(TRACE_CREATION_TIME, 150L))), + timestamps); + } +} diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 6ecac3b23..d7dc1d302 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -47,4 +47,10 @@ span.groupby.session.window.graceperiod.ms = 100 dataflow.metriccollection.sampling.percent = 10.0 -trace.emit.punctuator.frequency = 15s \ No newline at end of file +trace.emit.punctuator.frequency = 15s + +peer.correlation = { + enabled.customers = ["*"] + enabled.agents = ["mirror"] + agent.type.attribute = "agent.type" +} diff --git a/semantic-convention-utils/src/main/java/org/hypertrace/semantic/convention/utils/http/HttpSemanticConventionUtils.java b/semantic-convention-utils/src/main/java/org/hypertrace/semantic/convention/utils/http/HttpSemanticConventionUtils.java index 222bebf9a..3a25a1386 100644 --- a/semantic-convention-utils/src/main/java/org/hypertrace/semantic/convention/utils/http/HttpSemanticConventionUtils.java +++ b/semantic-convention-utils/src/main/java/org/hypertrace/semantic/convention/utils/http/HttpSemanticConventionUtils.java @@ -223,6 +223,40 @@ public static Optional getDestinationIpAddress(Event event) { event, OTelSpanSemanticConventions.NET_PEER_IP.getValue()))); } + public static Optional getPeerIpAddress(Event event) { + return Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_SOCK_PEER_ADDR.getValue())) + .or( + () -> + Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_PEER_IP.getValue()))); + } + + public static Optional getPeerPort(Event event) { + return Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_SOCK_PEER_PORT.getValue())) + .or( + () -> + Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_PEER_PORT.getValue()))); + } + + public static Optional getHostIpAddress(Event event) { + return Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_SOCK_HOST_ADDR.getValue())); + } + + public static Optional getHostPort(Event event) { + return Optional.ofNullable( + SpanAttributeUtils.getStringAttribute( + event, OTelSpanSemanticConventions.NET_SOCK_HOST_PORT.getValue())); + } + public static Optional getEnvironmentForSpan(Event event) { return Optional.ofNullable( SpanAttributeUtils.getStringAttribute( diff --git a/span-normalizer/helm/templates/span-normalizer-config.yaml b/span-normalizer/helm/templates/span-normalizer-config.yaml index 55c3f830e..0a672ec66 100644 --- a/span-normalizer/helm/templates/span-normalizer-config.yaml +++ b/span-normalizer/helm/templates/span-normalizer-config.yaml @@ -70,6 +70,10 @@ data: {{- if hasKey .Values.spanNormalizerConfig.processor "bypassKey" }} bypass.key = "{{ .Values.spanNormalizerConfig.processor.bypassKey }}" {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "bypassOverrideTenants" }} + bypass.override.tenants = "{{ .Values.spanNormalizerConfig.processor.bypassOverrideTenants | toJson }}" + {{- end }} {{- if hasKey .Values.spanNormalizerConfig.processor "lateArrivalThresholdDuration" }} late.arrival.threshold.duration = "{{ .Values.spanNormalizerConfig.processor.lateArrivalThresholdDuration }}" diff --git a/span-normalizer/span-normalizer-api/src/main/avro/SpanMetadata.avdl b/span-normalizer/span-normalizer-api/src/main/avro/SpanMetadata.avdl new file mode 100644 index 000000000..2eee5da02 --- /dev/null +++ b/span-normalizer/span-normalizer-api/src/main/avro/SpanMetadata.avdl @@ -0,0 +1,7 @@ +@namespace("org.hypertrace.core.spannormalizer") +protocol SpanMetadataProtocol { + record SpanMetadata { + string service_name; + bytes event_id; + } +} diff --git a/span-normalizer/span-normalizer-api/src/main/avro/trace-identity/PeerIdentity.avdl b/span-normalizer/span-normalizer-api/src/main/avro/trace-identity/PeerIdentity.avdl new file mode 100644 index 000000000..e33ca0f02 --- /dev/null +++ b/span-normalizer/span-normalizer-api/src/main/avro/trace-identity/PeerIdentity.avdl @@ -0,0 +1,14 @@ +@namespace("org.hypertrace.core.spannormalizer") +protocol PeerIdentityProtocol { + record PeerIdentity { + union { null, IpIdentity } ip_identity = null; + } + + record IpIdentity { + union { null, string } tenant_id = null; + union { null, string } environment = null; + union { null, string } host_addr = null; + union { null, string } peer_addr = null; + union { null, string } peer_port = null; + } +} diff --git a/span-normalizer/span-normalizer-constants/src/main/java/org/hypertrace/core/semantic/convention/constants/span/OTelSpanSemanticConventions.java b/span-normalizer/span-normalizer-constants/src/main/java/org/hypertrace/core/semantic/convention/constants/span/OTelSpanSemanticConventions.java index e68a91d35..c29fa0834 100644 --- a/span-normalizer/span-normalizer-constants/src/main/java/org/hypertrace/core/semantic/convention/constants/span/OTelSpanSemanticConventions.java +++ b/span-normalizer/span-normalizer-constants/src/main/java/org/hypertrace/core/semantic/convention/constants/span/OTelSpanSemanticConventions.java @@ -9,6 +9,9 @@ public enum OTelSpanSemanticConventions { NET_PEER_PORT("net.peer.port"), NET_PEER_NAME("net.peer.name"), NET_SOCK_PEER_ADDR("net.sock.peer.addr"), + NET_SOCK_PEER_PORT("net.sock.peer.port"), + NET_SOCK_HOST_ADDR("net.sock.host.addr"), + NET_SOCK_HOST_PORT("net.sock.host.port"), NET_TRANSPORT("net.transport"), HTTP_CLIENT_IP("http.client_ip"); diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java index 3f74060e6..c8a1619a7 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java @@ -1,22 +1,36 @@ package org.hypertrace.core.spannormalizer.rawspan; import com.typesafe.config.Config; +import java.util.Collections; +import java.util.List; import org.apache.kafka.streams.kstream.Predicate; import org.hypertrace.core.datamodel.AttributeValue; import org.hypertrace.core.datamodel.RawSpan; import org.hypertrace.core.spannormalizer.TraceIdentity; public class ByPassPredicate implements Predicate { + private static final String ALL = "*"; private static final String SPAN_BYPASSED_CONFIG = "processor.bypass.key"; - private String bypassKey; + private static final String SPAN_BYPASSED_OVERRIDE_CONFIG = "processor.bypass.override.tenants"; + private final String bypassKey; + private final List bypassOverrideTenants; public ByPassPredicate(Config jobConfig) { bypassKey = jobConfig.hasPath(SPAN_BYPASSED_CONFIG) ? jobConfig.getString(SPAN_BYPASSED_CONFIG) : null; + bypassOverrideTenants = + jobConfig.hasPath(SPAN_BYPASSED_OVERRIDE_CONFIG) + ? jobConfig.getStringList(SPAN_BYPASSED_OVERRIDE_CONFIG) + : Collections.emptyList(); } @Override public boolean test(TraceIdentity traceIdentity, RawSpan rawSpan) { + // tenant level spans bypass override + if (bypassOverrideTenants.contains(rawSpan.getCustomerId()) + || bypassOverrideTenants.contains(ALL)) { + return false; + } AttributeValue defaultAttributeValue = AttributeValue.newBuilder().setValue("false").build(); AttributeValue attributeValue = bypassKey != null