diff --git a/owasp-suppressions.xml b/owasp-suppressions.xml
index 44049cb..34b4c2f 100644
--- a/owasp-suppressions.xml
+++ b/owasp-suppressions.xml
@@ -52,7 +52,7 @@
CVE-2018-11770
CVE-2018-17190
-
+
CVE-2012-5783
CVE-2020-13956
-
+
@@ -96,18 +96,25 @@
^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$
CVE-2023-37475
-
+
^pkg:maven/org\.codehaus\.janino/janino@.*$
CVE-2023-33546
-
+
^pkg:maven/org\.codehaus\.janino/commons\-compiler@.*$
CVE-2023-33546
+
+
+ ^pkg:maven/io\.netty/netty\-handler@.*$
+ CVE-2023-4586
+
diff --git a/view-creator-framework/build.gradle.kts b/view-creator-framework/build.gradle.kts
index 9533d76..7140324 100644
--- a/view-creator-framework/build.gradle.kts
+++ b/view-creator-framework/build.gradle.kts
@@ -11,7 +11,7 @@ tasks.test {
}
dependencies {
- implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54")
+ implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60")
implementation(platform("io.grpc:grpc-bom:1.57.2"))
implementation("org.apache.avro:avro:1.11.1")
implementation("org.apache.pinot:pinot-tools:0.12.1") {
@@ -52,7 +52,7 @@ dependencies {
implementation("org.apache.calcite:calcite-babel:1.34.0")
implementation("com.google.code.gson:gson:2.10.1")
implementation("org.apache.spark:spark-launcher_2.12:3.4.1")
- implementation("org.xerial.snappy:snappy-java:1.1.10.1")
+ implementation("org.xerial.snappy:snappy-java:1.1.10.4")
implementation("com.google.protobuf:protobuf-java-util:3.16.3")
implementation("org.codehaus.janino:janino:3.1.9")
}
@@ -60,13 +60,13 @@ dependencies {
implementation(platform("org.glassfish.jersey:jersey-bom:2.40"))
implementation(platform("org.jetbrains.kotlin:kotlin-bom:1.6.21"))
- compileOnly("org.projectlombok:lombok:1.18.24")
- annotationProcessor("org.projectlombok:lombok:1.18.24")
- implementation("org.slf4j:slf4j-api:1.7.36")
- implementation("com.typesafe:config:1.4.1")
+ compileOnly("org.projectlombok:lombok:1.18.26")
+ annotationProcessor("org.projectlombok:lombok:1.18.26")
+ implementation("org.slf4j:slf4j-api:2.0.5")
+ implementation("com.typesafe:config:1.4.2")
- testImplementation("org.junit.jupiter:junit-jupiter:5.9.0")
- testImplementation("org.mockito:mockito-core:4.7.0")
+ testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
+ testImplementation("org.mockito:mockito-core:5.2.0")
}
// Disabling compatibility check for the test avro definitions.
diff --git a/view-generator-framework/build.gradle.kts b/view-generator-framework/build.gradle.kts
index 531cbc3..fd2f492 100644
--- a/view-generator-framework/build.gradle.kts
+++ b/view-generator-framework/build.gradle.kts
@@ -11,21 +11,21 @@ tasks.test {
}
dependencies {
- implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54")
- implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.2")
+ implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60")
+ implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.7")
implementation("org.hypertrace.core.kafkastreams.framework:avro-partitioners:0.2.13")
implementation("org.apache.avro:avro:1.11.1")
implementation("com.typesafe:config:1.4.2")
implementation("com.google.guava:guava:32.0.1-jre")
// Logging
- implementation("org.slf4j:slf4j-api:1.7.36")
+ implementation("org.slf4j:slf4j-api:2.0.5")
- testImplementation("org.junit.jupiter:junit-jupiter:5.9.0")
- testImplementation("org.junit-pioneer:junit-pioneer:1.7.1")
- testImplementation("org.mockito:mockito-core:4.7.0")
+ testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
+ testImplementation("org.junit-pioneer:junit-pioneer:2.0.0")
+ testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs")
- testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2")
+ testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
}
// Disabling compatibility check for the test avro definitions.
diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java
index ad813ae..51bb56b 100644
--- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java
+++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java
@@ -6,12 +6,12 @@
import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.VIEW_GENERATORS_CONFIG;
import com.typesafe.config.Config;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.TreeSet;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
@@ -20,7 +20,7 @@
import org.hypertrace.core.serviceframework.config.ConfigUtils;
public class MultiViewGeneratorLauncher extends KafkaStreamsApp {
- private Map viewGenConfigs;
+ private final Map viewGenConfigs;
public MultiViewGeneratorLauncher(ConfigClient configClient) {
super(configClient);
@@ -61,23 +61,23 @@ public String getJobConfigKey() {
@Override
public List getInputTopics(Map properties) {
List viewGenNames = getViewGenName(properties);
- Set inputTopics = new HashSet<>();
+ Set inputTopics = new TreeSet<>();
for (String viewGen : viewGenNames) {
Config viewGenConfig = viewGenConfigs.get(viewGen);
inputTopics.addAll(viewGenConfig.getStringList(INPUT_TOPICS_CONFIG_KEY));
}
- return inputTopics.stream().collect(Collectors.toList());
+ return new ArrayList<>(inputTopics);
}
@Override
public List getOutputTopics(Map properties) {
List viewGenNames = getViewGenName(properties);
- Set outputTopics = new HashSet<>();
+ Set outputTopics = new TreeSet<>();
for (String viewGen : viewGenNames) {
Config viewGenConfig = viewGenConfigs.get(viewGen);
outputTopics.add(viewGenConfig.getString(OUTPUT_TOPIC_CONFIG_KEY));
}
- return outputTopics.stream().collect(Collectors.toList());
+ return new ArrayList<>(outputTopics);
}
private Config getJobConfig(Map properties) {
diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java
similarity index 59%
rename from view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java
rename to view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java
index 1c1f209..9b8dfd3 100644
--- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java
+++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java
@@ -3,18 +3,17 @@
import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.*;
import com.typesafe.config.Config;
-import java.util.ArrayList;
+import java.time.Instant;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.hypertrace.core.viewgenerator.JavaCodeBasedViewGenerator;
-public class ViewGenerationProcessTransformer
- implements Transformer>> {
+public class ViewGenerationProcessor
+ implements Processor {
private final String viewGenName;
@@ -22,21 +21,17 @@ public class ViewGenerationProcessTransformer viewClass;
private JavaCodeBasedViewGenerator viewGenerator;
- private ProcessorContext context;
- private To outputTopic;
+ private ProcessorContext context;
- public ViewGenerationProcessTransformer(String viewGenName) {
+ public ViewGenerationProcessor(String viewGenName) {
this.viewGenName = viewGenName;
}
@Override
- public void init(ProcessorContext context) {
+ public void init(ProcessorContext context) {
this.context = context;
this.jobConfig = (Config) context.appConfigs().get(this.viewGenName);
- String outputTopicName = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);
- this.outputTopic = To.child(outputTopicName);
-
this.viewgenClassName = jobConfig.getString(VIEW_GENERATOR_CLASS_CONFIG_KEY);
try {
viewGenerator = createViewGenerator();
@@ -48,15 +43,14 @@ public void init(ProcessorContext context) {
}
@Override
- public List> transform(String key, IN value) {
- List> outputKVPairs = new ArrayList<>();
- List output = viewGenerator.process(value);
+ public void process(Record record) {
+ List output = viewGenerator.process(record.value());
if (output != null) {
+ long nowMillis = Instant.now().toEpochMilli();
for (OUT out : output) {
- outputKVPairs.add(KeyValue.pair(key, out));
+ context.forward(new Record<>(record.key(), out, nowMillis));
}
}
- return outputKVPairs;
}
@Override
diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java
index 1f2e4c2..08662b2 100644
--- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java
+++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java
@@ -14,6 +14,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.kafkastreams.framework.partitioner.AvroFieldValuePartitioner;
@@ -54,14 +55,16 @@ public StreamsBuilder buildTopology(
KStream inputStream = (KStream) inputStreams.get(topic);
if (inputStream == null) {
- inputStream = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), null));
+ inputStream =
+ streamsBuilder.stream(
+ topic, Consumed.with(Serdes.String(), null).withName("source-" + topic));
inputStreams.put(topic, inputStream);
}
if (mergedStream == null) {
mergedStream = inputStream;
} else {
- mergedStream = mergedStream.merge(inputStream);
+ mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream"));
}
}
@@ -80,11 +83,12 @@ public StreamsBuilder buildTopology(
}
mergedStream
- .flatTransform(() -> new ViewGenerationProcessTransformer(getJobConfigKey()))
+ .process(() -> new ViewGenerationProcessor(getJobConfigKey()))
.to(
outputTopic,
Produced.with(
- Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic)));
+ Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic))
+ .withName("sink-" + outputTopic));
return streamsBuilder;
}