From 21cf54b98ba4fd3d3e8565bb62e4cf1f3c059489 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Wed, 27 Sep 2023 20:45:32 +0530 Subject: [PATCH 1/3] name kafka streams topology nodes appropriately --- .../service/MultiViewGeneratorLauncher.java | 8 ++--- ...rmer.java => ViewGenerationProcessor.java} | 32 ++++++++----------- .../service/ViewGeneratorLauncher.java | 12 ++++--- 3 files changed, 25 insertions(+), 27 deletions(-) rename view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/{ViewGenerationProcessTransformer.java => ViewGenerationProcessor.java} (59%) diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java index ad813ae..4208925 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java @@ -6,12 +6,12 @@ import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.VIEW_GENERATORS_CONFIG; import com.typesafe.config.Config; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; @@ -20,7 +20,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; public class MultiViewGeneratorLauncher extends KafkaStreamsApp { - private Map viewGenConfigs; + private final Map viewGenConfigs; public MultiViewGeneratorLauncher(ConfigClient configClient) { super(configClient); @@ -66,7 +66,7 @@ public List getInputTopics(Map properties) { Config viewGenConfig = viewGenConfigs.get(viewGen); inputTopics.addAll(viewGenConfig.getStringList(INPUT_TOPICS_CONFIG_KEY)); } - return inputTopics.stream().collect(Collectors.toList()); + return new ArrayList<>(inputTopics); } @Override @@ -77,7 +77,7 @@ public List getOutputTopics(Map properties) { Config viewGenConfig = viewGenConfigs.get(viewGen); outputTopics.add(viewGenConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); } - return outputTopics.stream().collect(Collectors.toList()); + return new ArrayList<>(outputTopics); } private Config getJobConfig(Map properties) { diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java similarity index 59% rename from view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java rename to view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java index 1c1f209..adfe09b 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessTransformer.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java @@ -3,18 +3,17 @@ import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.*; import com.typesafe.config.Config; -import java.util.ArrayList; +import java.time.Instant; import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.hypertrace.core.viewgenerator.JavaCodeBasedViewGenerator; -public class ViewGenerationProcessTransformer - implements Transformer>> { +public class ViewGenerationProcessor + implements Processor { private final String viewGenName; @@ -22,21 +21,17 @@ public class ViewGenerationProcessTransformer viewClass; private JavaCodeBasedViewGenerator viewGenerator; - private ProcessorContext context; - private To outputTopic; + private ProcessorContext context; - public ViewGenerationProcessTransformer(String viewGenName) { + public ViewGenerationProcessor(String viewGenName) { this.viewGenName = viewGenName; } @Override - public void init(ProcessorContext context) { + public void init(ProcessorContext context) { this.context = context; this.jobConfig = (Config) context.appConfigs().get(this.viewGenName); - String outputTopicName = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); - this.outputTopic = To.child(outputTopicName); - this.viewgenClassName = jobConfig.getString(VIEW_GENERATOR_CLASS_CONFIG_KEY); try { viewGenerator = createViewGenerator(); @@ -48,15 +43,14 @@ public void init(ProcessorContext context) { } @Override - public List> transform(String key, IN value) { - List> outputKVPairs = new ArrayList<>(); - List output = viewGenerator.process(value); + public void process(Record record) { + long nowMillis = Instant.now().toEpochMilli(); + List output = viewGenerator.process(record.value()); if (output != null) { for (OUT out : output) { - outputKVPairs.add(KeyValue.pair(key, out)); + context.forward(new Record<>(record.key(), out, nowMillis)); } } - return outputKVPairs; } @Override diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java index 1f2e4c2..08662b2 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java @@ -14,6 +14,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; import org.hypertrace.core.kafkastreams.framework.partitioner.AvroFieldValuePartitioner; @@ -54,14 +55,16 @@ public StreamsBuilder buildTopology( KStream inputStream = (KStream) inputStreams.get(topic); if (inputStream == null) { - inputStream = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), null)); + inputStream = + streamsBuilder.stream( + topic, Consumed.with(Serdes.String(), null).withName("source-" + topic)); inputStreams.put(topic, inputStream); } if (mergedStream == null) { mergedStream = inputStream; } else { - mergedStream = mergedStream.merge(inputStream); + mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream")); } } @@ -80,11 +83,12 @@ public StreamsBuilder buildTopology( } mergedStream - .flatTransform(() -> new ViewGenerationProcessTransformer(getJobConfigKey())) + .process(() -> new ViewGenerationProcessor(getJobConfigKey())) .to( outputTopic, Produced.with( - Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic))); + Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic)) + .withName("sink-" + outputTopic)); return streamsBuilder; } From 5137f365546aa9e9391a6b6471e0468ec099b646 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Wed, 27 Sep 2023 22:16:20 +0530 Subject: [PATCH 2/3] lib upgrades + review comments fix --- owasp-suppressions.xml | 15 +++++++++++---- view-creator-framework/build.gradle.kts | 16 ++++++++-------- view-generator-framework/build.gradle.kts | 14 +++++++------- .../service/MultiViewGeneratorLauncher.java | 5 +++-- .../service/ViewGenerationProcessor.java | 2 +- 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/owasp-suppressions.xml b/owasp-suppressions.xml index 44049cb..34b4c2f 100644 --- a/owasp-suppressions.xml +++ b/owasp-suppressions.xml @@ -52,7 +52,7 @@ CVE-2018-11770 CVE-2018-17190 - + CVE-2012-5783 CVE-2020-13956 - + @@ -96,18 +96,25 @@ ^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$ CVE-2023-37475 - + ^pkg:maven/org\.codehaus\.janino/janino@.*$ CVE-2023-33546 - + ^pkg:maven/org\.codehaus\.janino/commons\-compiler@.*$ CVE-2023-33546 + + + ^pkg:maven/io\.netty/netty\-handler@.*$ + CVE-2023-4586 + diff --git a/view-creator-framework/build.gradle.kts b/view-creator-framework/build.gradle.kts index 9533d76..7140324 100644 --- a/view-creator-framework/build.gradle.kts +++ b/view-creator-framework/build.gradle.kts @@ -11,7 +11,7 @@ tasks.test { } dependencies { - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60") implementation(platform("io.grpc:grpc-bom:1.57.2")) implementation("org.apache.avro:avro:1.11.1") implementation("org.apache.pinot:pinot-tools:0.12.1") { @@ -52,7 +52,7 @@ dependencies { implementation("org.apache.calcite:calcite-babel:1.34.0") implementation("com.google.code.gson:gson:2.10.1") implementation("org.apache.spark:spark-launcher_2.12:3.4.1") - implementation("org.xerial.snappy:snappy-java:1.1.10.1") + implementation("org.xerial.snappy:snappy-java:1.1.10.4") implementation("com.google.protobuf:protobuf-java-util:3.16.3") implementation("org.codehaus.janino:janino:3.1.9") } @@ -60,13 +60,13 @@ dependencies { implementation(platform("org.glassfish.jersey:jersey-bom:2.40")) implementation(platform("org.jetbrains.kotlin:kotlin-bom:1.6.21")) - compileOnly("org.projectlombok:lombok:1.18.24") - annotationProcessor("org.projectlombok:lombok:1.18.24") - implementation("org.slf4j:slf4j-api:1.7.36") - implementation("com.typesafe:config:1.4.1") + compileOnly("org.projectlombok:lombok:1.18.26") + annotationProcessor("org.projectlombok:lombok:1.18.26") + implementation("org.slf4j:slf4j-api:2.0.5") + implementation("com.typesafe:config:1.4.2") - testImplementation("org.junit.jupiter:junit-jupiter:5.9.0") - testImplementation("org.mockito:mockito-core:4.7.0") + testImplementation("org.junit.jupiter:junit-jupiter:5.9.2") + testImplementation("org.mockito:mockito-core:5.2.0") } // Disabling compatibility check for the test avro definitions. diff --git a/view-generator-framework/build.gradle.kts b/view-generator-framework/build.gradle.kts index 531cbc3..fd2f492 100644 --- a/view-generator-framework/build.gradle.kts +++ b/view-generator-framework/build.gradle.kts @@ -11,21 +11,21 @@ tasks.test { } dependencies { - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.2") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.7") implementation("org.hypertrace.core.kafkastreams.framework:avro-partitioners:0.2.13") implementation("org.apache.avro:avro:1.11.1") implementation("com.typesafe:config:1.4.2") implementation("com.google.guava:guava:32.0.1-jre") // Logging - implementation("org.slf4j:slf4j-api:1.7.36") + implementation("org.slf4j:slf4j-api:2.0.5") - testImplementation("org.junit.jupiter:junit-jupiter:5.9.0") - testImplementation("org.junit-pioneer:junit-pioneer:1.7.1") - testImplementation("org.mockito:mockito-core:4.7.0") + testImplementation("org.junit.jupiter:junit-jupiter:5.9.2") + testImplementation("org.junit-pioneer:junit-pioneer:2.0.0") + testImplementation("org.mockito:mockito-core:5.2.0") testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs") - testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0") } // Disabling compatibility check for the test avro definitions. diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java index 4208925..6b1cc5a 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; @@ -61,7 +62,7 @@ public String getJobConfigKey() { @Override public List getInputTopics(Map properties) { List viewGenNames = getViewGenName(properties); - Set inputTopics = new HashSet<>(); + Set inputTopics = new TreeSet<>(); for (String viewGen : viewGenNames) { Config viewGenConfig = viewGenConfigs.get(viewGen); inputTopics.addAll(viewGenConfig.getStringList(INPUT_TOPICS_CONFIG_KEY)); @@ -72,7 +73,7 @@ public List getInputTopics(Map properties) { @Override public List getOutputTopics(Map properties) { List viewGenNames = getViewGenName(properties); - Set outputTopics = new HashSet<>(); + Set outputTopics = new TreeSet<>(); for (String viewGen : viewGenNames) { Config viewGenConfig = viewGenConfigs.get(viewGen); outputTopics.add(viewGenConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java index adfe09b..9b8dfd3 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGenerationProcessor.java @@ -44,9 +44,9 @@ public void init(ProcessorContext context) { @Override public void process(Record record) { - long nowMillis = Instant.now().toEpochMilli(); List output = viewGenerator.process(record.value()); if (output != null) { + long nowMillis = Instant.now().toEpochMilli(); for (OUT out : output) { context.forward(new Record<>(record.key(), out, nowMillis)); } From dd9984e9f063c0d66ac18427193cd2f5a7ff6511 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Wed, 27 Sep 2023 22:19:19 +0530 Subject: [PATCH 3/3] fix spotless --- .../core/viewgenerator/service/MultiViewGeneratorLauncher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java index 6b1cc5a..51bb56b 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/MultiViewGeneratorLauncher.java @@ -8,7 +8,6 @@ import com.typesafe.config.Config; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set;