diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5cb79a55b..69e719e4b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.4" hypertrace-serviceFramework = "0.1.60" -hypertrace-kafkaStreams = "0.4.1" +hypertrace-kafkaStreams = "0.4.2" hypertrace-view-generator = "0.4.19" grpc = "1.57.2" diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 10e6f8825..28bedefaa 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -172,16 +172,16 @@ public void process(Record record) { .setTraceId(traceId) .setSpanIds(List.of(spanId)) .build(); - traceEmitPunctuator.scheduleTask(currentTimeMs, key); + traceEmitPunctuator.scheduleTask(currentTimeMs + groupingWindowTimeoutMs, key); } else { traceState.getSpanIds().add(spanId); long prevScheduleTimestamp = traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs; traceState.setTraceEndTimestamp(currentTimeMs); if (!traceEmitPunctuator.rescheduleTask( prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) { - logger.debug( - "Failed to proactively reschedule task on getting span for trace key {}, schedule already dropped!", - key); + logger.warn( + "Failed to proactively reschedule task on getting span for trace id {}, schedule already dropped!", + HexUtils.getHex(traceState.getTraceId())); } } diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 04c1d6d1c..b28a92b48 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -231,13 +231,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000); inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime); - // trace2 should have 1 span span3 - StructuredTrace trace = outputTopic.readValue(); - assertEquals(1, trace.getEventList().size()); - assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); - // trace1 should have 2 span span1, span2 - trace = outputTopic.readValue(); + StructuredTrace trace = outputTopic.readValue(); assertEquals(2, trace.getEventList().size()); Set traceEventIds = trace.getEventList().stream() @@ -246,6 +241,11 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertTrue(traceEventIds.contains("event-1")); assertTrue(traceEventIds.contains("event-2")); + // trace2 should have 1 span span3 + trace = outputTopic.readValue(); + assertEquals(1, trace.getEventList().size()); + assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); + inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3, messageTime); messageTime = advanceAndSyncClockMock(messageTime, clock, 45_000); inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5, messageTime);