-
Notifications
You must be signed in to change notification settings - Fork 16
Support service correlation for mirroring spans #431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
95d5ea0
25bcf21
c788574
ecb328a
b4a470e
4680b9d
91d57ec
6fadfaf
d911dd3
4a7b6d8
aed68f2
131029f
02cbae2
0912315
3a217cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,8 @@ dependencies { | |
| because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637") | ||
| } | ||
| implementation(project(":span-normalizer:span-normalizer-api")) | ||
| implementation(project(":semantic-convention-utils")) | ||
| implementation(project(":hypertrace-trace-enricher:enriched-span-constants")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this changes should be part of Trace-Enricher.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We introduced new Attrs in |
||
| implementation(libs.hypertrace.data.model) | ||
| implementation(libs.hypertrace.serviceFramework.framework) | ||
| implementation(libs.hypertrace.serviceFramework.metrics) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,10 @@ | |
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_AGENTS; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_CORRELATION_ENABLED_CUSTOMERS; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; | ||
|
|
@@ -15,6 +19,7 @@ | |
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_PUNCTUATOR_STORE_NAME; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; | ||
| import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER; | ||
| import static org.hypertrace.traceenricher.enrichedspan.constants.EnrichedSpanConstants.PEER_SERVICE_NAME; | ||
|
|
||
| import com.typesafe.config.Config; | ||
| import io.micrometer.core.instrument.Counter; | ||
|
|
@@ -23,9 +28,12 @@ | |
| import java.time.Clock; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -36,14 +44,26 @@ | |
| import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
| import org.apache.kafka.streams.processor.api.Record; | ||
| import org.apache.kafka.streams.state.KeyValueStore; | ||
| import org.hypertrace.core.datamodel.Event; | ||
| import org.hypertrace.core.datamodel.RawSpan; | ||
| import org.hypertrace.core.datamodel.StructuredTrace; | ||
| import org.hypertrace.core.datamodel.Timestamps; | ||
| import org.hypertrace.core.datamodel.shared.HexUtils; | ||
| import org.hypertrace.core.datamodel.shared.SpanAttributeUtils; | ||
| import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator; | ||
| import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder; | ||
| import org.hypertrace.core.kafkastreams.framework.punctuators.ThrottledPunctuatorConfig; | ||
| import org.hypertrace.core.rawspansgrouper.utils.TraceLatencyMeter; | ||
| import org.hypertrace.core.rawspansgrouper.validator.PeerIdentityValidator; | ||
| import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; | ||
| import org.hypertrace.core.spannormalizer.IpIdentity; | ||
| import org.hypertrace.core.spannormalizer.PeerIdentity; | ||
| import org.hypertrace.core.spannormalizer.SpanIdentity; | ||
| import org.hypertrace.core.spannormalizer.SpanMetadata; | ||
| import org.hypertrace.core.spannormalizer.TraceIdentity; | ||
| import org.hypertrace.core.spannormalizer.TraceState; | ||
| import org.hypertrace.semantic.convention.utils.http.HttpSemanticConventionUtils; | ||
| import org.hypertrace.semantic.convention.utils.span.SpanSemanticConventionUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -60,6 +80,7 @@ public class RawSpansProcessor | |
| private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class); | ||
| private static final String PROCESSING_LATENCY_TIMER = | ||
| "hypertrace.rawspansgrouper.processing.latency"; | ||
| private static final String ALL = "*"; | ||
| private static final ConcurrentMap<String, Timer> tenantToSpansGroupingTimer = | ||
| new ConcurrentHashMap<>(); | ||
| // counter for number of spans dropped per tenant | ||
|
|
@@ -69,25 +90,46 @@ public class RawSpansProcessor | |
| // counter for number of truncated traces per tenant | ||
| private static final ConcurrentMap<String, Counter> truncatedTracesCounter = | ||
| new ConcurrentHashMap<>(); | ||
| private ProcessorContext<TraceIdentity, StructuredTrace> context; | ||
| private final Clock clock; | ||
| private KeyValueStore<SpanIdentity, RawSpan> spanStore; | ||
| private KeyValueStore<TraceIdentity, TraceState> traceStateStore; | ||
| private KeyValueStore<PeerIdentity, SpanMetadata> peerIdentityToSpanMetadataStateStore; | ||
| private long groupingWindowTimeoutMs; | ||
| private double dataflowSamplingPercent = -1; | ||
| private static final Map<String, Long> maxSpanCountMap = new HashMap<>(); | ||
| private long defaultMaxSpanCountLimit = Long.MAX_VALUE; | ||
| private TraceEmitPunctuator traceEmitPunctuator; | ||
| private Cancellable traceEmitTasksPunctuatorCancellable; | ||
| private String peerCorrelationAgentTypeAttribute; | ||
| private List<String> peerCorrelationEnabledCustomers; | ||
| private List<String> peerCorrelationEnabledAgents; | ||
| private TraceLatencyMeter traceLatencyMeter; | ||
|
|
||
| public RawSpansProcessor(Clock clock) { | ||
| this.clock = clock; | ||
| } | ||
|
|
||
| @Override | ||
| public void init(ProcessorContext<TraceIdentity, StructuredTrace> context) { | ||
| this.context = context; | ||
| this.spanStore = context.getStateStore(SPAN_STATE_STORE_NAME); | ||
| this.traceStateStore = context.getStateStore(TRACE_STATE_STORE); | ||
| this.peerIdentityToSpanMetadataStateStore = | ||
| context.getStateStore(PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE); | ||
| Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG)); | ||
| this.peerCorrelationAgentTypeAttribute = | ||
| jobConfig.hasPath(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG) | ||
| ? jobConfig.getString(PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG) | ||
| : null; | ||
| this.peerCorrelationEnabledCustomers = | ||
| jobConfig.hasPath(PEER_CORRELATION_ENABLED_CUSTOMERS) | ||
| ? jobConfig.getStringList(PEER_CORRELATION_ENABLED_CUSTOMERS) | ||
| : Collections.emptyList(); | ||
| this.peerCorrelationEnabledAgents = | ||
| jobConfig.hasPath(PEER_CORRELATION_ENABLED_AGENTS) | ||
| ? jobConfig.getStringList(PEER_CORRELATION_ENABLED_AGENTS) | ||
| : Collections.emptyList(); | ||
| this.groupingWindowTimeoutMs = | ||
| jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000; | ||
|
|
||
|
|
@@ -96,15 +138,13 @@ public void init(ProcessorContext<TraceIdentity, StructuredTrace> context) { | |
| && jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY) <= 100) { | ||
| this.dataflowSamplingPercent = jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY); | ||
| } | ||
| this.traceLatencyMeter = new TraceLatencyMeter(dataflowSamplingPercent); | ||
|
|
||
| if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) { | ||
| Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT); | ||
| subConfig | ||
| .entrySet() | ||
| .forEach( | ||
| (entry) -> { | ||
| maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey())); | ||
| }); | ||
| .forEach(entry -> maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()))); | ||
| } | ||
|
|
||
| if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) { | ||
|
|
@@ -151,15 +191,23 @@ public void process(Record<TraceIdentity, RawSpan> record) { | |
| TraceIdentity key = record.key(); | ||
| RawSpan value = record.value(); | ||
| TraceState traceState = traceStateStore.get(key); | ||
| boolean firstEntry = (traceState == null); | ||
|
|
||
| if (shouldDropSpan(key, traceState)) { | ||
| return; | ||
| } | ||
|
|
||
| Event event = value.getEvent(); | ||
| // spans whose trace is not created by ByPass flow, their trace will be created in below | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: For now, it is controlled by customer_id. But, we should check if this is a mirroring agent or singleton trace.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are checking if its a mirroring agent: |
||
| // function call. Those spans' trace will not be created by TraceEmitPunctuator | ||
| if (isPeerServiceNameIdentificationRequired(event)) { | ||
| processSpanForPeerServiceNameIdentification(key, value, traceState, currentTimeMs); | ||
| return; | ||
|
sanket-mundra marked this conversation as resolved.
|
||
| } | ||
|
|
||
| boolean firstEntry = (traceState == null); | ||
| String tenantId = key.getTenantId(); | ||
| ByteBuffer traceId = value.getTraceId(); | ||
| ByteBuffer spanId = value.getEvent().getEventId(); | ||
| ByteBuffer spanId = event.getEventId(); | ||
| spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value); | ||
|
|
||
| if (firstEntry) { | ||
|
|
@@ -196,7 +244,104 @@ public void process(Record<TraceIdentity, RawSpan> record) { | |
| .record(Duration.between(start, Instant.now()).toMillis(), TimeUnit.MILLISECONDS); | ||
| // no need to do context.forward. the punctuator will emit the trace once it's eligible to be | ||
| // emitted | ||
| return; | ||
| } | ||
|
|
||
| private void processSpanForPeerServiceNameIdentification( | ||
| TraceIdentity key, RawSpan value, TraceState traceState, long currentTimeMs) { | ||
| Event event = value.getEvent(); | ||
| String tenantId = key.getTenantId(); | ||
| ByteBuffer traceId = value.getTraceId(); | ||
| boolean firstEntry = (traceState == null); | ||
| Optional<String> maybeEnvironment = HttpSemanticConventionUtils.getEnvironmentForSpan(event); | ||
| if (SpanSemanticConventionUtils.isClientSpanForOCFormat( | ||
| event.getAttributes().getAttributeMap())) { | ||
| handleClientSpan(tenantId, event, maybeEnvironment.orElse(null)); | ||
| } else { | ||
| handleServerSpan(tenantId, event, maybeEnvironment.orElse(null)); | ||
| } | ||
|
|
||
| // create structured trace and forward | ||
| Timestamps timestamps = | ||
| traceLatencyMeter.trackEndToEndLatencyTimestamps( | ||
| currentTimeMs, firstEntry ? currentTimeMs : traceState.getTraceStartTimestamp()); | ||
| StructuredTrace trace = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We build structureTrace in two places - 1) TraceEmitPuncutator 2) Bypass condition. This seems a third place. What is the condition for this - it's not very clear. Will the same trace be also emitted via TraceEmitPuncuator.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the same trace be also emitted via TraceEmitPuncuator as we return in the caller function:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment |
||
| StructuredTraceBuilder.buildStructuredTraceFromRawSpans( | ||
| List.of(value), traceId, tenantId, timestamps); | ||
| context.forward(new Record<>(key, trace, currentTimeMs), OUTPUT_TOPIC_PRODUCER); | ||
| } | ||
|
|
||
| // put the peer service identity and corresponding service name in state store | ||
| private void handleClientSpan(String tenantId, Event event, String environment) { | ||
| Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event); | ||
| Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event); | ||
| Optional<String> maybePeerPort = HttpSemanticConventionUtils.getPeerPort(event); | ||
| String serviceName = event.getServiceName(); | ||
| PeerIdentity peerIdentity = | ||
| PeerIdentity.newBuilder() | ||
| .setIpIdentity( | ||
| IpIdentity.newBuilder() | ||
| .setTenantId(tenantId) | ||
| .setEnvironment(environment) | ||
| .setHostAddr(maybeHostAddr.orElse(null)) | ||
| .setPeerAddr(maybePeerAddr.orElse(null)) | ||
| .setPeerPort(maybePeerPort.orElse(null)) | ||
| .build()) | ||
| .build(); | ||
| if (PeerIdentityValidator.isValid(peerIdentity)) { | ||
| this.peerIdentityToSpanMetadataStateStore.put( | ||
| peerIdentity, | ||
| SpanMetadata.newBuilder() | ||
| .setServiceName(serviceName) | ||
| .setEventId(event.getEventId()) | ||
| .build()); | ||
| } | ||
| } | ||
|
|
||
| // get the service name for that peer service identity and correlate the current span | ||
| private void handleServerSpan(String tenantId, Event event, String environment) { | ||
| Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event); | ||
| Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event); | ||
| Optional<String> maybeHostPort = HttpSemanticConventionUtils.getHostPort(event); | ||
| PeerIdentity peerIdentity = | ||
| PeerIdentity.newBuilder() | ||
| .setIpIdentity( | ||
| IpIdentity.newBuilder() | ||
| .setTenantId(tenantId) | ||
| .setEnvironment(environment) | ||
| .setHostAddr(maybePeerAddr.orElse(null)) | ||
| .setPeerAddr(maybeHostAddr.orElse(null)) | ||
| .setPeerPort(maybeHostPort.orElse(null)) | ||
| .build()) | ||
| .build(); | ||
| if (PeerIdentityValidator.isValid(peerIdentity)) { | ||
| SpanMetadata spanMetadata = this.peerIdentityToSpanMetadataStateStore.get(peerIdentity); | ||
| if (Objects.nonNull(spanMetadata)) { | ||
| logger.debug( | ||
| "Adding {} as: {} from spanId: {} in spanId: {} with service name: {}", | ||
| PEER_SERVICE_NAME, | ||
| spanMetadata.getServiceName(), | ||
| HexUtils.getHex(spanMetadata.getEventId()), | ||
| HexUtils.getHex(event.getEventId()), | ||
| event.getServiceName()); | ||
|
|
||
| event | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we should add this in enrichedAttributes not attribute map. @skjindal93 I believe enriched attributes are for anything that traceable resolves and add to the span
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adding to enriched attributes since we changed the attribute name to |
||
| .getEnrichedAttributes() | ||
| .getAttributeMap() | ||
| .put(PEER_SERVICE_NAME, AttributeValueCreator.create(spanMetadata.getServiceName())); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private boolean isPeerServiceNameIdentificationRequired(Event event) { | ||
| if (!this.peerCorrelationEnabledCustomers.contains(event.getCustomerId()) | ||
| && !this.peerCorrelationEnabledCustomers.contains(ALL)) { | ||
| return false; | ||
| } | ||
|
|
||
| String agentType = | ||
| SpanAttributeUtils.getStringAttributeWithDefault( | ||
| event, this.peerCorrelationAgentTypeAttribute, null); | ||
| return Objects.nonNull(agentType) && this.peerCorrelationEnabledAgents.contains(agentType); | ||
| } | ||
|
|
||
| private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.