Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ hypertrace-grpc-client-utils ={ module = "org.hypertrace.core.grpcutils:grpc-cli
hypertrace-grpc-client-rxUtils = { module = "org.hypertrace.core.grpcutils:grpc-client-rx-utils", version.ref = "hypertrace-grpc-utils"}
hypertrace-serviceFramework-metrics = { module = "org.hypertrace.core.serviceframework:platform-metrics", version.ref = "hypertrace-serviceFramework" }
hypertrace-serviceFramework-framework = { module = "org.hypertrace.core.serviceframework:platform-service-framework", version.ref = "hypertrace-serviceFramework" }
hypertrace-data-model = { module ="org.hypertrace.core.datamodel:data-model", version = "0.1.30" }
hypertrace-data-model = { module ="org.hypertrace.core.datamodel:data-model", version = "0.1.31" }
hypertrace-kafkaStreams-framework = { module = "org.hypertrace.core.kafkastreams.framework:kafka-streams-framework", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafkaStreams-avroPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:avro-partitioners", version.ref = "hypertrace-kafkaStreams" }
hypertrace-kafkaStreams-weightedGroupPartitioners = { module = "org.hypertrace.core.kafkastreams.framework:weighted-group-partitioner", version.ref = "hypertrace-kafkaStreams" }
Expand Down Expand Up @@ -52,7 +52,7 @@ uadetector-resources = { module = "net.sf.uadetector:uadetector-resources",versi

reactivex-rxjava3 = { module = "io.reactivex.rxjava3:rxjava", version = "3.0.11" }

json-json = { module = "org.json:json", version = "20230618" }
json-json = { module = "org.json:json", version = "20231013" }

#grpc dependency
grpc-netty = { module = "io.grpc:grpc-netty", version.ref = "grpc" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ static boolean isDifferentTrace(StructuredTrace cachedTrace, StructuredTrace tra
}

static boolean isStructuredTraceChanged(StructuredTrace cachedTrace, StructuredTrace trace) {
return isDifferentTrace(cachedTrace, trace)
|| isTraceEventsChanged(cachedTrace, trace)
|| isTraceEntitiesChanged(cachedTrace, trace);
return isDifferentTrace(cachedTrace, trace) || isTraceEventsChanged(cachedTrace, trace);
Comment thread
aman-bansal marked this conversation as resolved.
}

/** Check if the events or theirs edges has changed */
Expand All @@ -41,19 +39,4 @@ static boolean isTraceEventsChanged(StructuredTrace cachedTrace, StructuredTrace
}
return false;
}

/** Check if the entities or theirs edges has changed */
static boolean isTraceEntitiesChanged(StructuredTrace cachedTrace, StructuredTrace trace) {

// trace entities internally changed (full trace comparison is costly, so we are doing only with
// required fields)
if (isDifferentTrace(cachedTrace, trace)
|| cachedTrace.getEntityList().size() != trace.getEntityList().size()
|| cachedTrace.getEntityEdgeList().size() != trace.getEntityEdgeList().size()) {
LOG.debug(
"Cached and Input trace are not same. Reason: they are having different size either for entities");
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;

public class StructuredTraceGraphBuilder {

private static final Logger LOG = LoggerFactory.getLogger(StructuredTraceGraphBuilder.class);

private static final ThreadLocal<StructuredTraceGraph> cachedGraphThreadLocal =
Expand All @@ -17,14 +18,9 @@ public class StructuredTraceGraphBuilder {
public static StructuredTraceGraph buildGraph(StructuredTrace trace) {
StructuredTrace cachedTrace = cachedTraceThreadLocal.get();
StructuredTraceGraph cachedGraph = cachedGraphThreadLocal.get();
boolean shouldRebuildTraceEventsGraph =
GraphBuilderUtil.isTraceEventsChanged(cachedTrace, trace);
boolean shouldRebuildTraceEntitiesGraph =
GraphBuilderUtil.isTraceEntitiesChanged(cachedTrace, trace);

if (null == cachedGraph
|| GraphBuilderUtil.isDifferentTrace(cachedTrace, trace)
|| (shouldRebuildTraceEventsGraph && shouldRebuildTraceEntitiesGraph)) {
|| GraphBuilderUtil.isTraceEventsChanged(cachedTrace, trace)) {
Instant start = Instant.now();
StructuredTraceGraph graph = new StructuredTraceGraph(trace);
if (LOG.isDebugEnabled()) {
Expand All @@ -39,65 +35,25 @@ public static StructuredTraceGraph buildGraph(StructuredTrace trace) {
return graph;
}

if (shouldRebuildTraceEventsGraph || shouldRebuildTraceEntitiesGraph) {
Instant start = Instant.now();
if (shouldRebuildTraceEventsGraph) {
cachedGraph.reCreateTraceEventsGraph(trace);
} else {
cachedGraph.reCreateTraceEntitiesGraph(trace);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Time taken in building TraceEventsGraph, duration_millis:{} for tenantId:{}",
Duration.between(start, Instant.now()).toMillis(),
trace.getCustomerId());
}
cachedTraceThreadLocal.set(StructuredTrace.newBuilder(trace).build());
cachedGraphThreadLocal.set(cachedGraph);
debugGraph("Case: Partially building the graph.", cachedGraph, trace);
return cachedGraph;
}

debugGraph("Case: Not building the graph.", cachedGraphThreadLocal.get(), trace);
return cachedGraph;
}

private static void debugGraph(
String logPrefix, StructuredTraceGraph graph, StructuredTrace trace) {
if (null != graph
&& (null == graph.getTraceEntitiesGraph() || null == graph.getTraceEventsGraph())) {
if (null != graph && null == graph.getTraceEventsGraph()) {
LOG.info(
logPrefix
+ "StructuredTraceGraph is not built correctly, trace {}, Is events graph non-null: {}."
+ " Is entities graph non-null: {}",
+ "StructuredTraceGraph is not built correctly, trace {}, Is events graph non-null:"
+ " {}.",
trace,
(null != graph.getTraceEventsGraph()),
(null != graph.getTraceEntitiesGraph()));
(null != graph.getTraceEventsGraph()));

// build the graph again and check
StructuredTraceGraph tempGraph = new StructuredTraceGraph(trace);
LOG.info(
logPrefix
+ "Recreating StructuredTraceGraph. Is events graph non-null: {}."
+ " Is entities graph non-null: {}",
(null != tempGraph.getTraceEventsGraph()),
(null != tempGraph.getTraceEntitiesGraph()));

tempGraph.reCreateTraceEventsGraph(trace);
LOG.info(
logPrefix
+ "Recreating events graph. Is events graph non-null: {}."
+ " Is entities graph non-null: {}",
(null != tempGraph.getTraceEventsGraph()),
(null != tempGraph.getTraceEntitiesGraph()));

tempGraph.reCreateTraceEntitiesGraph(trace);
LOG.info(
logPrefix
+ "Recreating entities graph. Is events graph non-null: {}."
+ " Is entities graph non-null: {}",
(null != tempGraph.getTraceEventsGraph()),
(null != tempGraph.getTraceEntitiesGraph()));
logPrefix + "Recreating StructuredTraceGraph. Is events graph non-null: {}.",
(null != tempGraph.getTraceEventsGraph()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public void testIsTraceEventsChanged() {
when(underTestTrace.getEventEdgeList()).thenReturn(List.of(eventEdge1, eventEdge2));

Assertions.assertTrue(GraphBuilderUtil.isTraceEventsChanged(cachedTrace, underTestTrace));
Assertions.assertFalse(GraphBuilderUtil.isTraceEntitiesChanged(cachedTrace, underTestTrace));
}

@Test
Expand Down Expand Up @@ -148,6 +147,5 @@ public void testIsTraceEntitiesChanged() {
when(underTestTrace.getEventEdgeList()).thenReturn(List.of(eventEdge1, eventEdge2));

Assertions.assertFalse(GraphBuilderUtil.isTraceEventsChanged(cachedTrace, underTestTrace));
Assertions.assertTrue(GraphBuilderUtil.isTraceEntitiesChanged(cachedTrace, underTestTrace));
}
}