From 74a4d463785ec8408515742b227ca8a2bcc17255 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Fri, 29 Sep 2023 17:57:21 +0530 Subject: [PATCH 1/2] use appropriate topology names --- .../viewgenerator/service/ViewGeneratorLauncher.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java index 08662b2..4e37800 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java @@ -59,12 +59,12 @@ public StreamsBuilder buildTopology( streamsBuilder.stream( topic, Consumed.with(Serdes.String(), null).withName("source-" + topic)); inputStreams.put(topic, inputStream); - } - if (mergedStream == null) { - mergedStream = inputStream; - } else { - mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream")); + if (mergedStream == null) { + mergedStream = inputStream; + } else { + mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream")); + } } } From c87b613ddef1ec4312274c2646bd1d4ca7bd9424 Mon Sep 17 00:00:00 2001 From: Laxman Ch Date: Fri, 29 Sep 2023 18:40:33 +0530 Subject: [PATCH 2/2] use appropriate topology names --- view-generator-framework/build.gradle.kts | 2 +- .../service/ViewGeneratorLauncher.java | 19 ++++++++++++------- .../MultiViewGeneratorLauncherTest.java | 10 +++++++++- .../src/test/resources/log4j2.properties | 8 ++++++++ 4 files changed, 30 insertions(+), 9 deletions(-) create mode 100644 view-generator-framework/src/test/resources/log4j2.properties diff --git a/view-generator-framework/build.gradle.kts b/view-generator-framework/build.gradle.kts index fd2f492..cd8e7de 100644 --- a/view-generator-framework/build.gradle.kts +++ b/view-generator-framework/build.gradle.kts @@ -25,7 +25,7 @@ dependencies { testImplementation("org.junit-pioneer:junit-pioneer:2.0.0") testImplementation("org.mockito:mockito-core:5.2.0") testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs") - testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0") } // Disabling compatibility check for the test avro definitions. diff --git a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java index 4e37800..98e768e 100644 --- a/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java +++ b/view-generator-framework/src/main/java/org/hypertrace/core/viewgenerator/service/ViewGeneratorLauncher.java @@ -50,7 +50,7 @@ public StreamsBuilder buildTopology( String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); KStream mergedStream = null; - + int mergedStreamId = 0; for (String topic : inputTopics) { KStream inputStream = (KStream) inputStreams.get(topic); @@ -59,12 +59,15 @@ public StreamsBuilder buildTopology( streamsBuilder.stream( topic, Consumed.with(Serdes.String(), null).withName("source-" + topic)); inputStreams.put(topic, inputStream); + } - if (mergedStream == null) { - mergedStream = inputStream; - } else { - mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream")); - } + if (mergedStream == null) { + mergedStream = inputStream; + } else { + mergedStream = + mergedStream.merge( + inputStream, Named.as("merged-stream-" + getViewGenName() + "-" + mergedStreamId)); + mergedStreamId++; } } @@ -83,7 +86,9 @@ public StreamsBuilder buildTopology( } mergedStream - .process(() -> new ViewGenerationProcessor(getJobConfigKey())) + .process( + () -> new ViewGenerationProcessor(getJobConfigKey()), + Named.as("processor-" + getViewGenName())) .to( outputTopic, Produced.with( diff --git a/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java b/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java index 17fc355..6ec9321 100644 --- a/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java +++ b/view-generator-framework/src/test/java/org/hypertrace/core/viewgenerator/MultiViewGeneratorLauncherTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; @@ -29,8 +30,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MultiViewGeneratorLauncherTest { + private static final Logger logger = + LoggerFactory.getLogger(MultiViewGeneratorLauncherTest.class); private MultiViewGeneratorLauncher underTest; private List> inputTopics = new ArrayList<>(); private TestOutputTopic spanTypeTwoOutputTopic; @@ -54,10 +59,11 @@ public void setUp() { StreamsBuilder streamsBuilder = underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + Topology testTopology = streamsBuilder.build(); Properties props = new Properties(); mergedProps.forEach(props::put); - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TopologyTestDriver td = new TopologyTestDriver(testTopology, props); Serde spanTypeOneSerde = new AvroSerde<>(); spanTypeOneSerde.configure(Map.of(), false); @@ -87,6 +93,8 @@ public void setUp() { "test-raw-service-type-output-topic", new StringSerde().deserializer(), rawServiceTypeSerde.deserializer()); + + logger.info("test topology: {}", testTopology.describe()); } @Test diff --git a/view-generator-framework/src/test/resources/log4j2.properties b/view-generator-framework/src/test/resources/log4j2.properties new file mode 100644 index 0000000..62c371c --- /dev/null +++ b/view-generator-framework/src/test/resources/log4j2.properties @@ -0,0 +1,8 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT