diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 46ca4d351..bc507d6f6 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter")) + implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -86,6 +87,10 @@ tasks.register("copyServiceConfigs") { createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "main", + "common"), + createCopySpec("hypertrace-metrics-generator", + "hypertrace-metrics-generator", + "main", "common") ).into("./build/resources/main/configs/") } @@ -146,7 +151,11 @@ tasks.register("copyServiceConfigsTest") { createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "test", - "hypertrace-metrics-exporter") + "hypertrace-metrics-exporter"), + createCopySpec("hypertrace-metrics-generator", + "hypertrace-metrics-generator", + "test", + "hypertrace-metrics-generator") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 3151c8979..ede314b3f 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -18,6 +18,7 @@ import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; import org.hypertrace.metrics.exporter.MetricsExporterService; +import org.hypertrace.metrics.generator.MetricsGenerator; import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; @@ -31,12 +32,21 @@ public class HypertraceIngester extends KafkaStreamsApp { private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config"; private Map> jobNameToSubTopology = new HashMap<>(); - MetricsExporterService metricsExporterService; + private MetricsExporterService metricsExporterService; + boolean metricsPipelineEnabled; public HypertraceIngester(ConfigClient configClient) { super(configClient); - metricsExporterService = new MetricsExporterService(configClient); - metricsExporterService.setConfig(getSubJobConfig("hypertrace-metrics-exporter")); + Config config = getAppConfig(); + metricsPipelineEnabled = + config.hasPath("metrics.pipeline.enable") + ? config.getBoolean("metrics.pipeline.enable") + : false; + if (metricsPipelineEnabled) { + String metricsPipelineExporter = config.getString("metrics.pipeline.exporter"); + metricsExporterService = new MetricsExporterService(configClient); + metricsExporterService.setConfig(getSubJobConfig(metricsPipelineExporter)); + } } private KafkaStreamsApp getSubTopologyInstance(String name) { @@ -57,6 +67,9 @@ private KafkaStreamsApp getSubTopologyInstance(String name) { case "hypertrace-metrics-processor": kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient()); break; + case "hypertrace-metrics-generator": + kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient()); + break; default: throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name)); } @@ -69,34 +82,50 @@ public StreamsBuilder buildTopology( StreamsBuilder streamsBuilder, Map> inputStreams) { + // build for trace pipeline List subTopologiesNames = getSubTopologiesNames(properties); - for (String subTopologyName : subTopologiesNames) { LOGGER.info("Building sub topology :{}", subTopologyName); + streamsBuilder = buildSubTopology(subTopologyName, properties, streamsBuilder, inputStreams); + } - // create an instance and retains is reference to be used later in other methods - KafkaStreamsApp subTopology = getSubTopologyInstance(subTopologyName); - jobNameToSubTopology.put( - subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology)); - - // need to use its own copy as input/output topics are different - Config subTopologyJobConfig = getSubJobConfig(subTopologyName); - Map flattenSubTopologyConfig = - subTopology.getStreamsConfig(subTopologyJobConfig); - flattenSubTopologyConfig.put(subTopology.getJobConfigKey(), subTopologyJobConfig); - - // add specific job properties - addProperties(properties, flattenSubTopologyConfig); - streamsBuilder = subTopology.buildTopology(properties, streamsBuilder, inputStreams); - - // retain per job key and its topology - jobNameToSubTopology.put( - subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology)); + // build for metrics pipeline + if (metricsPipelineEnabled) { + List metricsSubTopologiesNames = getMetricsPipelineSubTopologiesNames(properties); + for (String subTopologyName : metricsSubTopologiesNames) { + LOGGER.info("Building metrics pipeline sub topology :{}", subTopologyName); + streamsBuilder = + buildSubTopology(subTopologyName, properties, streamsBuilder, inputStreams); + } } return streamsBuilder; } + private StreamsBuilder buildSubTopology( + String subTopologyName, + Map properties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + // create an instance and retains is reference to be used later in other methods + KafkaStreamsApp subTopology = getSubTopologyInstance(subTopologyName); + jobNameToSubTopology.put(subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology)); + + // need to use its own copy as input/output topics are different + Config subTopologyJobConfig = getSubJobConfig(subTopologyName); + Map flattenSubTopologyConfig = + subTopology.getStreamsConfig(subTopologyJobConfig); + flattenSubTopologyConfig.put(subTopology.getJobConfigKey(), subTopologyJobConfig); + + // add specific job properties + addProperties(properties, flattenSubTopologyConfig); + streamsBuilder = subTopology.buildTopology(properties, streamsBuilder, inputStreams); + + // retain per job key and its topology + jobNameToSubTopology.put(subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology)); + return streamsBuilder; + } + @Override public String getJobConfigKey() { return HYPERTRACE_INGESTER_JOB_CONFIG; @@ -125,25 +154,35 @@ public List getOutputTopics(Map properties) { @Override protected void doInit() { super.doInit(); - this.metricsExporterService.doInit(); + if (metricsPipelineEnabled) { + this.metricsExporterService.doInit(); + } } @Override protected void doStart() { super.doStart(); - this.metricsExporterService.doStart(); + if (metricsPipelineEnabled) { + this.metricsExporterService.doStart(); + } } @Override protected void doStop() { super.doStop(); - this.metricsExporterService.doStop(); + if (metricsPipelineEnabled) { + this.metricsExporterService.doStop(); + } } private List getSubTopologiesNames(Map properties) { return getJobConfig(properties).getStringList("sub.topology.names"); } + private List getMetricsPipelineSubTopologiesNames(Map properties) { + return getJobConfig(properties).getStringList("metrics.pipeline.sub.topology.names"); + } + private Config getSubJobConfig(String jobName) { return configClient.getConfig( jobName, diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index 4cf7b58f0..a874fe408 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -7,10 +7,20 @@ sub.topology.names = [ "span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", - "all-views", - "hypertrace-metrics-processor" + "all-views" ] +metrics.pipeline.sub.topology.names = [ + "hypertrace-metrics-processor", + "hypertrace-metrics-generator" +] + + +metrics.pipeline.exporter = "hypertrace-metrics-exporter" + +metrics.pipeline.enable = true +metrics.pipeline.enable = ${?METRICS_PIPELINE_ENABLE} + precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} diff --git a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf index 16a968d23..04e97846b 100644 --- a/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-exporter/hypertrace-metrics-exporter/src/main/resources/configs/common/application.conf @@ -5,6 +5,9 @@ main.class = org.hypertrace.metrics.exporter.MetricsExporterService input.topic = "enriched-otlp-metrics" +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + buffer.config { max.queue.size = 5000 max.batch.size = 1000 diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java index e85cf4ab3..d4f3a2193 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsGenerator.java @@ -34,7 +34,7 @@ public class MetricsGenerator extends KafkaStreamsApp { public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config"; public static final String METRICS_IDENTITY_STORE = "metric-identity-store"; public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store"; - public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer"; + public static final String OUTPUT_TOPIC_METRICS_PRODUCER = "output-topic-metrics-producer"; public MetricsGenerator(ConfigClient configClient) { super(configClient); @@ -76,7 +76,7 @@ public StreamsBuilder buildTopology( Produced outputTopicProducer = Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde()); - outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); + outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_METRICS_PRODUCER); inputStream .transform( diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java index ccfa78b16..5c2f7c70d 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/java/org/hypertrace/metrics/generator/MetricsProcessor.java @@ -12,7 +12,7 @@ import static org.hypertrace.metrics.generator.MetricsConstants.STATUS_CODE; import static org.hypertrace.metrics.generator.MetricsConstants.TENANT_ID_ATTR; import static org.hypertrace.metrics.generator.MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG; -import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER; +import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_METRICS_PRODUCER; import com.typesafe.config.Config; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; @@ -62,7 +62,7 @@ public void init(ProcessorContext context) { this.metricsStore = (KeyValueStore) context.getStateStore(MetricsGenerator.METRICS_IDENTITY_VALUE_STORE); - this.outputTopicProducer = To.child(OUTPUT_TOPIC_PRODUCER); + this.outputTopicProducer = To.child(OUTPUT_TOPIC_METRICS_PRODUCER); Config jobConfig = (Config) (context.appConfigs().get(METRICS_GENERATOR_JOB_CONFIG)); this.metricAggregationTimeMs = jobConfig.getLong(METRIC_AGGREGATION_TIME_MS); diff --git a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf index dc1336237..e12c1b455 100644 --- a/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf +++ b/hypertrace-metrics-generator/hypertrace-metrics-generator/src/main/resources/configs/common/application.conf @@ -5,7 +5,9 @@ main.class = org.hypertrace.metrics.generator.MetricsGenerator input.topic = "raw-service-view-events" output.topic = "otlp-metrics" + input.class = org.hypertrace.viewgenerator.api.RawServiceView + precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS}