Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ PreProcessedSpan preProcessSpan(Span span) {

String tenantId = maybeTenantId.get();

if (spanFilter.shouldDropSpan(span, spanTags)) {
if (spanFilter.shouldDropSpan(span, spanTags, processTags)) {
// increment dropped counter at tenant level
tenantToSpansDroppedCount
.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.hypertrace.core.spannormalizer.jaeger;

public class SpanDropFilter {

public static final String TAG_KEY = "tagKey";
public static final String OPERATOR = "operator";
public static final String TAG_VALUE = "tagValue";

public enum Operator {
EQ("EQ"),
NEQ("NEQ"),
EXISTS("EXISTS"),
CONTAINS("CONTAINS");

private final String value;

Operator(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}

private String tagKey;
private Operator operator;
private String tagValue;

public SpanDropFilter(String tagKey, String operator, String tagValue) {
this.tagKey = tagKey;
this.operator = Operator.valueOf(operator);
this.tagValue = tagValue;
}

public String getTagKey() {
return tagKey;
}

public Operator getOperator() {
return operator;
}

public String getTagValue() {
return tagValue;
}

@Override
public String toString() {
return "SpanDropFilter{"
+ "tagKey='"
+ tagKey
+ '\''
+ ", operator="
+ operator
+ ", tagValue='"
+ tagValue
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package org.hypertrace.core.spannormalizer.jaeger;

import static org.hypertrace.core.spannormalizer.jaeger.SpanDropFilter.OPERATOR;
import static org.hypertrace.core.spannormalizer.jaeger.SpanDropFilter.TAG_KEY;
import static org.hypertrace.core.spannormalizer.jaeger.SpanDropFilter.TAG_VALUE;

import com.google.common.util.concurrent.RateLimiter;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigList;
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -35,6 +41,8 @@ public class SpanFilter {
*/
private static final String SPAN_DROP_CRITERION_CONFIG = "processor.spanDropCriterion";

private static final String SPAN_DROP_FILTERS = "processor.spanDropFilters";

public static final String ROOT_SPAN_DROP_CRITERION_CONFIG =
"processor.rootExitSpanDropCriterion";
private static final String ROOT_SPAN_ALWAYS_DROP = "alwaysDrop";
Expand All @@ -44,6 +52,8 @@ public class SpanFilter {
private static final String COLON = ":";

private List<List<Pair<String, String>>> spanDropCriterion = Collections.emptyList();
private List<List<SpanDropFilter>> spanDropFilters = Collections.emptyList();
;
private boolean alwaysDropRootSpan = false;
private List<List<Pair<String, String>>> rootSpanDropExclusionCriterion = Collections.emptyList();

Expand All @@ -55,6 +65,27 @@ public SpanFilter(Config config) {
this.spanDropCriterion = parseStringList(criterion);
}

if (config.hasPath(SPAN_DROP_FILTERS)) {
ConfigList spanDropFiltersConfig = config.getList(SPAN_DROP_FILTERS);
LOG.info("Span drop filters: {}", spanDropFiltersConfig);
this.spanDropFilters =
spanDropFiltersConfig.stream()
.map(
orFilters -> {
List<HashMap<String, String>> andFilters =
(List<HashMap<String, String>>) orFilters.unwrapped();
return andFilters.stream()
.map(
filter ->
new SpanDropFilter(
filter.get(TAG_KEY),
filter.get(OPERATOR),
filter.get(TAG_VALUE)))
.collect(Collectors.toList());
})
.collect(Collectors.toList());
}

if (config.hasPath(ROOT_SPAN_DROP_CRITERION_CONFIG)) {
Config rootSpanDropCriterionConfig = config.getConfig(ROOT_SPAN_DROP_CRITERION_CONFIG);
LOG.info("Root Span drop criterion: {}", rootSpanDropCriterionConfig);
Expand Down Expand Up @@ -88,14 +119,23 @@ private List<List<Pair<String, String>>> parseStringList(List<String> stringList
* the span should be dropped, false otherwise.
*/
public boolean shouldDropSpan(
JaegerSpanInternalModel.Span span, Map<String, JaegerSpanInternalModel.KeyValue> tags) {
JaegerSpanInternalModel.Span span,
Map<String, JaegerSpanInternalModel.KeyValue> tags,
Map<String, JaegerSpanInternalModel.KeyValue> processTags) {
if (anyCriteriaMatch(tags, spanDropCriterion)) {
Comment thread
findingrish marked this conversation as resolved.
if (DROPPED_SPANS_RATE_LIMITER.tryAcquire()) {
LOG.info("Dropping span: [{}] with drop criterion: [{}]", span, spanDropCriterion);
}
return true;
}

if (anySpanDropFiltersMatch(spanDropFilters, tags, processTags)) {
if (LOG.isDebugEnabled() && DROPPED_SPANS_RATE_LIMITER.tryAcquire()) {
LOG.debug("Dropping span: [{}] with drop filters: [{}]", span, spanDropFilters.toString());
}
return true;
}

if (isRootExitSpan(span, tags)) {
boolean anyCriteriaMatch = anyCriteriaMatch(tags, rootSpanDropExclusionCriterion);
boolean shouldDropSpan =
Expand Down Expand Up @@ -147,4 +187,37 @@ private boolean isRootExitSpan(

return SPAN_KIND_CLIENT.equals(spanKindKeyValue.getVStr());
}

private boolean anySpanDropFiltersMatch(
List<List<SpanDropFilter>> spanDropFilters,
Map<String, JaegerSpanInternalModel.KeyValue> tags,
Map<String, JaegerSpanInternalModel.KeyValue> processTags) {
return spanDropFilters.stream()
.anyMatch(
andFilters ->
andFilters.stream()
.allMatch(
filter ->
matchSpanDropFilter(filter, tags)
|| matchSpanDropFilter(filter, processTags)));
}

private boolean matchSpanDropFilter(
SpanDropFilter filter, Map<String, JaegerSpanInternalModel.KeyValue> tags) {
switch (filter.getOperator()) {
case EQ:
return tags.containsKey(filter.getTagKey())
&& StringUtils.equals(tags.get(filter.getTagKey()).getVStr(), filter.getTagValue());
Copy link
Copy Markdown
Contributor

@avinashkolluru avinashkolluru Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kotharironak We may have to look into the process tags too. See below

  public Optional<String> getTenantId(
      Map<String, KeyValue> spanTags, Map<String, KeyValue> processTags) {
    return Optional.ofNullable(processTags.get(tenantIdKey))
        .or(() -> Optional.ofNullable(spanTags.get(tenantIdKey)))
        .map(KeyValue::getVStr);
  }

cc: @laxman-traceable @ravisingal

Copy link
Copy Markdown
Contributor Author

@kotharironak kotharironak Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the process tags are meant for resources, do we want to drop span based on resource criteria? So, far we are dropping based on span attribute filters, and with that, we are able to filter existing scenarios, right? Are there some scenario?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ravisingal @avinashkolluru I have added the support for processTags too. So, now the filter will check in the union of two tags (processTags, spanTags).

Total Tags = Process Tags + Span Tags will be used for matching a filter.

case NEQ:
return tags.containsKey(filter.getTagKey())
&& !StringUtils.equals(tags.get(filter.getTagKey()).getVStr(), filter.getTagValue());
case CONTAINS:
return tags.containsKey(filter.getTagKey())
&& StringUtils.contains(tags.get(filter.getTagKey()).getVStr(), filter.getTagValue());
case EXISTS:
return tags.containsKey(filter.getTagKey());
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.hypertrace.core.spannormalizer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -130,5 +132,75 @@ public void whenJaegerSpansAreProcessedExpectRawSpansToBeOutput() {
KeyValue<String, LogEvents> keyValue = rawLogOutputTopic.readKeyValue();
LogEvents logEvents = keyValue.value;
Assertions.assertEquals(2, logEvents.getLogEvents().size());

// pipe in one more span which doesn't match spanDropFilters
Span span2 =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("2".getBytes()))
.setTraceId(ByteString.copyFrom("trace-2".getBytes()))
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr(SERVICE_NAME)
.build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("http.method")
.setVStr("GET")
.build())
.build();

inputTopic.pipeInput(span2);
KeyValue<TraceIdentity, RawSpan> kv1 = outputTopic.readKeyValue();
assertNotNull(kv1);
assertEquals("__default", kv1.key.getTenantId());
assertEquals(
HexUtils.getHex(ByteString.copyFrom("trace-2".getBytes()).toByteArray()),
HexUtils.getHex(kv1.key.getTraceId().array()));

// pipe in one more span which match one of spanDropFilters (http.method & http.url)
Span span3 =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("3".getBytes()))
.setTraceId(ByteString.copyFrom("trace-3".getBytes()))
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr(SERVICE_NAME)
.build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("http.method")
.setVStr("GET")
.build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("http.url")
.setVStr("http://xyz.com/health/check")
.build())
.build();

inputTopic.pipeInput(span3);
assertTrue(outputTopic.isEmpty());

// pipe in one more span which match one of spanDropFilters (grpc.url)
Span span4 =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("3".getBytes()))
.setTraceId(ByteString.copyFrom("trace-3".getBytes()))
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr(SERVICE_NAME)
.build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("grpc.url")
.setVStr("doesn't match with input filter set")
.build())
.build();

inputTopic.pipeInput(span4);
assertTrue(outputTopic.isEmpty());
}
}
Loading