Skip to content
This repository was archived by the owner on Nov 12, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions owasp-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<cve>CVE-2018-11770</cve>
<cve>CVE-2018-17190</cve>
</suppress>
<suppress until="2023-07-30Z">
<suppress until="2023-12-31Z">
<notes><![CDATA[
Same series of issues that also impacts jackson, vendor disputed and not currently fixed
https://github.com/janino-compiler/janino/issues/201
Expand All @@ -68,7 +68,7 @@
<cve>CVE-2012-5783</cve>
<cve>CVE-2020-13956</cve>
</suppress>
<suppress until="2023-08-30Z">
<suppress until="2023-12-31Z">
<notes><![CDATA[
Not yet fixed in quartz. file name: quartz-2.3.2.jar
]]></notes>
Expand Down Expand Up @@ -96,18 +96,25 @@
<packageUrl regex="true">^pkg:maven/org\.hypertrace\.core\.kafkastreams\.framework/avro\-partitioners@.*$</packageUrl>
<cve>CVE-2023-37475</cve>
</suppress>
<suppress until="2023-08-30Z">
<suppress until="2023-12-31Z">
<notes><![CDATA[
file name: janino-3.1.9.jar. Pinot is yet to upgrade to latest version. Revisit.
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.codehaus\.janino/janino@.*$</packageUrl>
<cve>CVE-2023-33546</cve>
</suppress>
<suppress until="2023-08-30Z">
<suppress until="2023-12-31Z">
<notes><![CDATA[
file name: commons-compiler-3.1.9.jar. Pinot is yet to upgrade to latest version. Revisit.
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.codehaus\.janino/commons\-compiler@.*$</packageUrl>
<cve>CVE-2023-33546</cve>
</suppress>
<suppress>
<notes><![CDATA[
file name: netty-handler-4.1.94.Final.jar. Pinot is yet to upgrade to latest version. Revisit.
]]></notes>
<packageUrl regex="true">^pkg:maven/io\.netty/netty\-handler@.*$</packageUrl>
<cve>CVE-2023-4586</cve>
</suppress>
</suppressions>
16 changes: 8 additions & 8 deletions view-creator-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tasks.test {
}

dependencies {
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60")
implementation(platform("io.grpc:grpc-bom:1.57.2"))
implementation("org.apache.avro:avro:1.11.1")
implementation("org.apache.pinot:pinot-tools:0.12.1") {
Expand Down Expand Up @@ -52,21 +52,21 @@ dependencies {
implementation("org.apache.calcite:calcite-babel:1.34.0")
implementation("com.google.code.gson:gson:2.10.1")
implementation("org.apache.spark:spark-launcher_2.12:3.4.1")
implementation("org.xerial.snappy:snappy-java:1.1.10.1")
implementation("org.xerial.snappy:snappy-java:1.1.10.4")
implementation("com.google.protobuf:protobuf-java-util:3.16.3")
implementation("org.codehaus.janino:janino:3.1.9")
}
implementation(platform("io.netty:netty-bom:4.1.94.Final"))
implementation(platform("org.glassfish.jersey:jersey-bom:2.40"))
implementation(platform("org.jetbrains.kotlin:kotlin-bom:1.6.21"))

compileOnly("org.projectlombok:lombok:1.18.24")
annotationProcessor("org.projectlombok:lombok:1.18.24")
implementation("org.slf4j:slf4j-api:1.7.36")
implementation("com.typesafe:config:1.4.1")
compileOnly("org.projectlombok:lombok:1.18.26")
annotationProcessor("org.projectlombok:lombok:1.18.26")
implementation("org.slf4j:slf4j-api:2.0.5")
Copy link
Copy Markdown
Contributor

@satish-mittal satish-mittal Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, hypertrace/hypertrace-ingestor transitively loads slf4j-api:2.0.5 via dependency on hypertrace/view-generator-framework, but internally it still uses log4j-slf4j-impl instead of slf4j2. That led to failure of integration tests in QS which loads container "hypertrace/hypertrace-view-generator:main".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised PR to fix it: hypertrace/hypertrace-ingester#425

implementation("com.typesafe:config:1.4.2")

testImplementation("org.junit.jupiter:junit-jupiter:5.9.0")
testImplementation("org.mockito:mockito-core:4.7.0")
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
testImplementation("org.mockito:mockito-core:5.2.0")
}

// Disabling compatibility check for the test avro definitions.
Expand Down
14 changes: 7 additions & 7 deletions view-generator-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ tasks.test {
}

dependencies {
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.54")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.2")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.60")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.3.7")
implementation("org.hypertrace.core.kafkastreams.framework:avro-partitioners:0.2.13")
implementation("org.apache.avro:avro:1.11.1")
implementation("com.typesafe:config:1.4.2")
implementation("com.google.guava:guava:32.0.1-jre")

// Logging
implementation("org.slf4j:slf4j-api:1.7.36")
implementation("org.slf4j:slf4j-api:2.0.5")

testImplementation("org.junit.jupiter:junit-jupiter:5.9.0")
testImplementation("org.junit-pioneer:junit-pioneer:1.7.1")
testImplementation("org.mockito:mockito-core:4.7.0")
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
testImplementation("org.junit-pioneer:junit-pioneer:2.0.0")
testImplementation("org.mockito:mockito-core:5.2.0")
testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
}

// Disabling compatibility check for the test avro definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.VIEW_GENERATORS_CONFIG;

import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.TreeSet;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
Expand All @@ -20,7 +20,7 @@
import org.hypertrace.core.serviceframework.config.ConfigUtils;

public class MultiViewGeneratorLauncher extends KafkaStreamsApp {
private Map<String, Config> viewGenConfigs;
private final Map<String, Config> viewGenConfigs;

public MultiViewGeneratorLauncher(ConfigClient configClient) {
super(configClient);
Expand Down Expand Up @@ -61,23 +61,23 @@ public String getJobConfigKey() {
@Override
public List<String> getInputTopics(Map<String, Object> properties) {
List<String> viewGenNames = getViewGenName(properties);
Set<String> inputTopics = new HashSet<>();
Set<String> inputTopics = new TreeSet<>();
for (String viewGen : viewGenNames) {
Config viewGenConfig = viewGenConfigs.get(viewGen);
inputTopics.addAll(viewGenConfig.getStringList(INPUT_TOPICS_CONFIG_KEY));
}
return inputTopics.stream().collect(Collectors.toList());
return new ArrayList<>(inputTopics);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - why this change? List.copyOf (unless we're intentionally making this mutable)

That said, we should make this deterministic and avoid the hash set here.

(same comment below)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Intellij suggestion

avoid the hash set here

set needed as input topic across all viewgens is same and we need to add only one source per input topic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set needed as input topic across all viewgens is same and we need to add only one source per input topic.

So we have a uniqueness requirement, not a set it sounds like. Using a hash set means that different runs with the same input will return their outputs in a different order. Even if that's not impactful today, it's a bad practice and makes writing tests annoying. Suggest any of

  • use a list, than run distinct on it
  • use an insertion-ordered set (e.g. guava's or linkedhashset I believe)
  • use streams e.g.
return getViewGenName(properties)
  .stream()
  .map(viewGenConfigs::get)
  .map(config -> config.getStringList(INPUT_TOPICS_CONFIG_KEY))
  .flatMap(Collection::stream)
  .distinct()
  .collect(Collectors.toUnmodifiableList());

Copy link
Copy Markdown
Contributor Author

@laxmanchekka laxmanchekka Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a TreeSet?
Set for me ensures uniqueness and TreeSet ensures sorted ordering (for tests thingy you mentioned).

Looks simpler and much readable than the streams snippet in the suggestion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks simpler and much readable than the streams snippet in the suggestion.

matter of opinion! but yes, any ordered set will do for repeatability, doesn't need to be insertion ordered - I was just trying to match the perceived intent.

}

@Override
public List<String> getOutputTopics(Map<String, Object> properties) {
List<String> viewGenNames = getViewGenName(properties);
Set<String> outputTopics = new HashSet<>();
Set<String> outputTopics = new TreeSet<>();
for (String viewGen : viewGenNames) {
Config viewGenConfig = viewGenConfigs.get(viewGen);
outputTopics.add(viewGenConfig.getString(OUTPUT_TOPIC_CONFIG_KEY));
}
return outputTopics.stream().collect(Collectors.toList());
return new ArrayList<>(outputTopics);
}

private Config getJobConfig(Map<String, Object> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,35 @@
import static org.hypertrace.core.viewgenerator.service.ViewGeneratorConstants.*;

import com.typesafe.config.Config;
import java.util.ArrayList;
import java.time.Instant;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.hypertrace.core.viewgenerator.JavaCodeBasedViewGenerator;

public class ViewGenerationProcessTransformer<IN extends SpecificRecord, OUT extends GenericRecord>
implements Transformer<String, IN, List<KeyValue<String, OUT>>> {
public class ViewGenerationProcessor<IN extends SpecificRecord, OUT extends GenericRecord>
implements Processor<String, IN, String, OUT> {

private final String viewGenName;

private Config jobConfig;
private String viewgenClassName;
private Class<OUT> viewClass;
private JavaCodeBasedViewGenerator<IN, OUT> viewGenerator;
private ProcessorContext context;
private To outputTopic;
private ProcessorContext<String, OUT> context;

public ViewGenerationProcessTransformer(String viewGenName) {
public ViewGenerationProcessor(String viewGenName) {
this.viewGenName = viewGenName;
}

@Override
public void init(ProcessorContext context) {
public void init(ProcessorContext<String, OUT> context) {
this.context = context;
this.jobConfig = (Config) context.appConfigs().get(this.viewGenName);

String outputTopicName = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY);
this.outputTopic = To.child(outputTopicName);

this.viewgenClassName = jobConfig.getString(VIEW_GENERATOR_CLASS_CONFIG_KEY);
try {
viewGenerator = createViewGenerator();
Expand All @@ -48,15 +43,14 @@ public void init(ProcessorContext context) {
}

@Override
public List<KeyValue<String, OUT>> transform(String key, IN value) {
List<KeyValue<String, OUT>> outputKVPairs = new ArrayList<>();
List<OUT> output = viewGenerator.process(value);
public void process(Record<String, IN> record) {
List<OUT> output = viewGenerator.process(record.value());
if (output != null) {
long nowMillis = Instant.now().toEpochMilli();
for (OUT out : output) {
outputKVPairs.add(KeyValue.pair(key, out));
context.forward(new Record<>(record.key(), out, nowMillis));
Comment thread
aaron-steinfeld marked this conversation as resolved.
}
}
return outputKVPairs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp;
import org.hypertrace.core.kafkastreams.framework.partitioner.AvroFieldValuePartitioner;
Expand Down Expand Up @@ -54,14 +55,16 @@ public StreamsBuilder buildTopology(
KStream<String, Object> inputStream = (KStream<String, Object>) inputStreams.get(topic);

if (inputStream == null) {
inputStream = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), null));
inputStream =
streamsBuilder.stream(
topic, Consumed.with(Serdes.String(), null).withName("source-" + topic));
inputStreams.put(topic, inputStream);
}

if (mergedStream == null) {
mergedStream = inputStream;
} else {
mergedStream = mergedStream.merge(inputStream);
mergedStream = mergedStream.merge(inputStream, Named.as("merged-stream"));
}
}

Expand All @@ -80,11 +83,12 @@ public StreamsBuilder buildTopology(
}

mergedStream
.flatTransform(() -> new ViewGenerationProcessTransformer(getJobConfigKey()))
.process(() -> new ViewGenerationProcessor(getJobConfigKey()))
.to(
outputTopic,
Produced.with(
Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic)));
Serdes.String(), producerValueSerde, getPartitioner(properties, outputTopic))
.withName("sink-" + outputTopic));
return streamsBuilder;
}

Expand Down