Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation(project(":hypertrace-view-generator:hypertrace-view-generator"))
implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor"))
implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter"))
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator"))

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand Down Expand Up @@ -86,6 +87,10 @@ tasks.register<Copy>("copyServiceConfigs") {
createCopySpec("hypertrace-metrics-exporter",
"hypertrace-metrics-exporter",
"main",
"common"),
createCopySpec("hypertrace-metrics-generator",
"hypertrace-metrics-generator",
"main",
"common")
).into("./build/resources/main/configs/")
}
Expand Down Expand Up @@ -146,7 +151,11 @@ tasks.register<Copy>("copyServiceConfigsTest") {
createCopySpec("hypertrace-metrics-exporter",
"hypertrace-metrics-exporter",
"test",
"hypertrace-metrics-exporter")
"hypertrace-metrics-exporter"),
createCopySpec("hypertrace-metrics-generator",
"hypertrace-metrics-generator",
"test",
"hypertrace-metrics-generator")
).into("./build/resources/test/configs/")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hypertrace.core.spannormalizer.SpanNormalizer;
import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher;
import org.hypertrace.metrics.exporter.MetricsExporterService;
import org.hypertrace.metrics.generator.MetricsGenerator;
import org.hypertrace.metrics.processor.MetricsProcessor;
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
import org.slf4j.Logger;
Expand All @@ -31,12 +32,21 @@ public class HypertraceIngester extends KafkaStreamsApp {
private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config";

private Map<String, Pair<String, KafkaStreamsApp>> jobNameToSubTopology = new HashMap<>();
MetricsExporterService metricsExporterService;
private MetricsExporterService metricsExporterService;
boolean metricsPipelineEnabled;

public HypertraceIngester(ConfigClient configClient) {
super(configClient);
metricsExporterService = new MetricsExporterService(configClient);
metricsExporterService.setConfig(getSubJobConfig("hypertrace-metrics-exporter"));
Config config = getAppConfig();
metricsPipelineEnabled =
config.hasPath("metrics.pipeline.enable")
? config.getBoolean("metrics.pipeline.enable")
: false;
if (metricsPipelineEnabled) {
String metricsPipelineExporter = config.getString("metrics.pipeline.exporter");
metricsExporterService = new MetricsExporterService(configClient);
metricsExporterService.setConfig(getSubJobConfig(metricsPipelineExporter));
}
}

private KafkaStreamsApp getSubTopologyInstance(String name) {
Expand All @@ -57,6 +67,9 @@ private KafkaStreamsApp getSubTopologyInstance(String name) {
case "hypertrace-metrics-processor":
kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-generator":
kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient());
break;
default:
throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name));
}
Expand All @@ -69,34 +82,50 @@ public StreamsBuilder buildTopology(
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {

// build for trace pipeline
List<String> subTopologiesNames = getSubTopologiesNames(properties);

for (String subTopologyName : subTopologiesNames) {
LOGGER.info("Building sub topology :{}", subTopologyName);
streamsBuilder = buildSubTopology(subTopologyName, properties, streamsBuilder, inputStreams);
}

// create an instance and retains is reference to be used later in other methods
KafkaStreamsApp subTopology = getSubTopologyInstance(subTopologyName);
jobNameToSubTopology.put(
subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology));

// need to use its own copy as input/output topics are different
Config subTopologyJobConfig = getSubJobConfig(subTopologyName);
Map<String, Object> flattenSubTopologyConfig =
subTopology.getStreamsConfig(subTopologyJobConfig);
flattenSubTopologyConfig.put(subTopology.getJobConfigKey(), subTopologyJobConfig);

// add specific job properties
addProperties(properties, flattenSubTopologyConfig);
streamsBuilder = subTopology.buildTopology(properties, streamsBuilder, inputStreams);

// retain per job key and its topology
jobNameToSubTopology.put(
subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology));
// build for metrics pipeline
if (metricsPipelineEnabled) {
List<String> metricsSubTopologiesNames = getMetricsPipelineSubTopologiesNames(properties);
for (String subTopologyName : metricsSubTopologiesNames) {
LOGGER.info("Building metrics pipeline sub topology :{}", subTopologyName);
streamsBuilder =
buildSubTopology(subTopologyName, properties, streamsBuilder, inputStreams);
}
}

return streamsBuilder;
}

private StreamsBuilder buildSubTopology(
String subTopologyName,
Map<String, Object> properties,
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> inputStreams) {
// create an instance and retains is reference to be used later in other methods
KafkaStreamsApp subTopology = getSubTopologyInstance(subTopologyName);
jobNameToSubTopology.put(subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology));

// need to use its own copy as input/output topics are different
Config subTopologyJobConfig = getSubJobConfig(subTopologyName);
Map<String, Object> flattenSubTopologyConfig =
subTopology.getStreamsConfig(subTopologyJobConfig);
flattenSubTopologyConfig.put(subTopology.getJobConfigKey(), subTopologyJobConfig);

// add specific job properties
addProperties(properties, flattenSubTopologyConfig);
streamsBuilder = subTopology.buildTopology(properties, streamsBuilder, inputStreams);

// retain per job key and its topology
jobNameToSubTopology.put(subTopologyName, Pair.of(subTopology.getJobConfigKey(), subTopology));
return streamsBuilder;
}

@Override
public String getJobConfigKey() {
return HYPERTRACE_INGESTER_JOB_CONFIG;
Expand Down Expand Up @@ -125,25 +154,35 @@ public List<String> getOutputTopics(Map<String, Object> properties) {
@Override
protected void doInit() {
super.doInit();
this.metricsExporterService.doInit();
if (metricsPipelineEnabled) {
this.metricsExporterService.doInit();
}
}

@Override
protected void doStart() {
super.doStart();
this.metricsExporterService.doStart();
if (metricsPipelineEnabled) {
this.metricsExporterService.doStart();
}
}

@Override
protected void doStop() {
super.doStop();
this.metricsExporterService.doStop();
if (metricsPipelineEnabled) {
this.metricsExporterService.doStop();
}
}

private List<String> getSubTopologiesNames(Map<String, Object> properties) {
return getJobConfig(properties).getStringList("sub.topology.names");
}

private List<String> getMetricsPipelineSubTopologiesNames(Map<String, Object> properties) {
return getJobConfig(properties).getStringList("metrics.pipeline.sub.topology.names");
}

private Config getSubJobConfig(String jobName) {
return configClient.getConfig(
jobName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@ sub.topology.names = [
"span-normalizer",
"raw-spans-grouper",
"hypertrace-trace-enricher",
"all-views",
"hypertrace-metrics-processor"
"all-views"
]

metrics.pipeline.sub.topology.names = [
"hypertrace-metrics-processor",
"hypertrace-metrics-generator"
]


metrics.pipeline.exporter = "hypertrace-metrics-exporter"

metrics.pipeline.enable = true
metrics.pipeline.enable = ${?METRICS_PIPELINE_ENABLE}

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ main.class = org.hypertrace.metrics.exporter.MetricsExporterService

input.topic = "enriched-otlp-metrics"

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}

buffer.config {
max.queue.size = 5000
max.batch.size = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class MetricsGenerator extends KafkaStreamsApp {
public static final String METRICS_GENERATOR_JOB_CONFIG = "metrics-generator-job-config";
public static final String METRICS_IDENTITY_STORE = "metric-identity-store";
public static final String METRICS_IDENTITY_VALUE_STORE = "metric-identity-value-Store";
public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer";
public static final String OUTPUT_TOPIC_METRICS_PRODUCER = "output-topic-metrics-producer";

public MetricsGenerator(ConfigClient configClient) {
super(configClient);
Expand Down Expand Up @@ -76,7 +76,7 @@ public StreamsBuilder buildTopology(

Produced<byte[], ResourceMetrics> outputTopicProducer =
Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde());
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER);
outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_METRICS_PRODUCER);

inputStream
.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import static org.hypertrace.metrics.generator.MetricsConstants.STATUS_CODE;
import static org.hypertrace.metrics.generator.MetricsConstants.TENANT_ID_ATTR;
import static org.hypertrace.metrics.generator.MetricsGenerator.METRICS_GENERATOR_JOB_CONFIG;
import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.metrics.generator.MetricsGenerator.OUTPUT_TOPIC_METRICS_PRODUCER;

import com.typesafe.config.Config;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void init(ProcessorContext context) {
this.metricsStore =
(KeyValueStore<MetricIdentity, Metric>)
context.getStateStore(MetricsGenerator.METRICS_IDENTITY_VALUE_STORE);
this.outputTopicProducer = To.child(OUTPUT_TOPIC_PRODUCER);
this.outputTopicProducer = To.child(OUTPUT_TOPIC_METRICS_PRODUCER);

Config jobConfig = (Config) (context.appConfigs().get(METRICS_GENERATOR_JOB_CONFIG));
this.metricAggregationTimeMs = jobConfig.getLong(METRIC_AGGREGATION_TIME_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ main.class = org.hypertrace.metrics.generator.MetricsGenerator

input.topic = "raw-service-view-events"
output.topic = "otlp-metrics"

input.class = org.hypertrace.viewgenerator.api.RawServiceView

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}

Expand Down