From 202b90a2840eee859b6e58e78385a8db8d42798d Mon Sep 17 00:00:00 2001 From: kishansairam9 Date: Wed, 18 Oct 2023 14:09:53 +0530 Subject: [PATCH 1/2] fix: bug in proactive rescheduling not accounting for future timeout --- .../hypertrace/core/rawspansgrouper/RawSpansProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 7c457d05d..10e6f8825 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -175,12 +175,12 @@ public void process(Record record) { traceEmitPunctuator.scheduleTask(currentTimeMs, key); } else { traceState.getSpanIds().add(spanId); - long prevScheduleTimestamp = traceState.getTraceEndTimestamp(); + long prevScheduleTimestamp = traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs; traceState.setTraceEndTimestamp(currentTimeMs); if (!traceEmitPunctuator.rescheduleTask( prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) { logger.debug( - "Failed to reschedule task on getting span for trace key {}, schedule already dropped!", + "Failed to proactively reschedule task on getting span for trace key {}, schedule already dropped!", key); } } From 5c6f27f195df684d200231f5b678b5d329d4d331 Mon Sep 17 00:00:00 2001 From: kishansairam9 Date: Wed, 18 Oct 2023 16:20:58 +0530 Subject: [PATCH 2/2] update --- gradle/libs.versions.toml | 2 +- .../core/rawspansgrouper/RawSpansProcessor.java | 8 ++++---- .../core/rawspansgrouper/RawSpansGrouperTest.java | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5cb79a55b..69e719e4b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.4" hypertrace-serviceFramework = "0.1.60" -hypertrace-kafkaStreams = "0.4.1" +hypertrace-kafkaStreams = "0.4.2" hypertrace-view-generator = "0.4.19" grpc = "1.57.2" diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 10e6f8825..28bedefaa 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -172,16 +172,16 @@ public void process(Record record) { .setTraceId(traceId) .setSpanIds(List.of(spanId)) .build(); - traceEmitPunctuator.scheduleTask(currentTimeMs, key); + traceEmitPunctuator.scheduleTask(currentTimeMs + groupingWindowTimeoutMs, key); } else { traceState.getSpanIds().add(spanId); long prevScheduleTimestamp = traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs; traceState.setTraceEndTimestamp(currentTimeMs); if (!traceEmitPunctuator.rescheduleTask( prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) { - logger.debug( - "Failed to proactively reschedule task on getting span for trace key {}, schedule already dropped!", - key); + logger.warn( + "Failed to proactively reschedule task on getting span for trace id {}, schedule already dropped!", + HexUtils.getHex(traceState.getTraceId())); } } diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 04c1d6d1c..b28a92b48 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -231,13 +231,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); - // trace2 should have 1 span span3 - StructuredTrace trace = outputTopic.readValue(); - assertEquals(1, trace.getEventList().size()); - assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); - // trace1 should have 2 span span1, span2 - trace = outputTopic.readValue(); + StructuredTrace trace = outputTopic.readValue(); assertEquals(2, trace.getEventList().size()); Set traceEventIds = trace.getEventList().stream() @@ -246,6 +241,11 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertTrue(traceEventIds.contains("event-1")); assertTrue(traceEventIds.contains("event-2")); + // trace2 should have 1 span span3 + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3, messageTime); messageTime = advanceAndSyncClockMock(messageTime, clock, 45_000); inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5, messageTime);