diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 41c3bccee..5cb79a55b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78" hypertrace-config-service = "0.1.54" hypertrace-grpc-utils = "0.12.4" hypertrace-serviceFramework = "0.1.60" -hypertrace-kafkaStreams = "0.4.0" +hypertrace-kafkaStreams = "0.4.1" hypertrace-view-generator = "0.4.19" grpc = "1.57.2" diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java index aaa89e693..597ef2dba 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java @@ -86,7 +86,7 @@ StreamsBuilder buildTopologyWithClock( Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde) .withCachingEnabled(); - StoreBuilder> traceEmitPunctuatorStoreBuilder = + StoreBuilder>> traceEmitPunctuatorStoreBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME), Serdes.Long(), diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java index 885b023f4..7c457d05d 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java @@ -23,7 +23,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -112,7 +111,7 @@ public void init(ProcessorContext context) { defaultMaxSpanCountLimit = jobConfig.getLong(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT); } - KeyValueStore> traceEmitPunctuatorStore = + KeyValueStore> traceEmitPunctuatorStore = context.getStateStore(TRACE_EMIT_PUNCTUATOR_STORE_NAME); traceEmitPunctuator = new TraceEmitPunctuator( diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java index 28b6f81cb..65bf563ba 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java @@ -77,7 +77,7 @@ class TraceEmitPunctuator extends AbstractThrottledPunctuator { TraceEmitPunctuator( ThrottledPunctuatorConfig throttledPunctuatorConfig, - KeyValueStore> throttledPunctuatorStore, + KeyValueStore> throttledPunctuatorStore, ProcessorContext context, KeyValueStore spanStore, KeyValueStore traceStateStore, diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java index eb4f39030..6262d1ac9 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuatorTest.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.util.List; -import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; import org.hypertrace.core.datamodel.Event; @@ -44,7 +43,6 @@ public void setUp() { when(context.keySerde()).thenReturn(avroSerde); spanStore = mock(KeyValueStore.class); traceStateStore = mock(KeyValueStore.class); - To outputTopicProducer = mock(To.class); emitCallback = new TraceEmitPunctuator( mock(ThrottledPunctuatorConfig.class),