diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 5eff9437c..c587865b3 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { implementation(project(":raw-spans-grouper:raw-spans-grouper")) implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) + implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor")) testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") testImplementation("org.mockito:mockito-core:3.8.0") @@ -61,10 +62,26 @@ tasks.processResources { tasks.register("copyServiceConfigs") { with( - createCopySpec("span-normalizer", "span-normalizer", "main", "common"), - createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"), - createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common") + createCopySpec("span-normalizer", + "span-normalizer", + "main", + "common"), + createCopySpec("raw-spans-grouper", + "raw-spans-grouper", + "main", + "common"), + createCopySpec("hypertrace-trace-enricher", + "hypertrace-trace-enricher", + "main", + "common"), + createCopySpec("hypertrace-view-generator", + "hypertrace-view-generator", + "main", + "common"), + createCopySpec("hypertrace-metrics-processor", + "hypertrace-metrics-processor", + "main", + "common") ).into("./build/resources/main/configs/") } @@ -101,10 +118,26 @@ tasks.test { tasks.register("copyServiceConfigsTest") { with( - createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"), - createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"), - createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"), - createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator") + createCopySpec("span-normalizer", + "span-normalizer", + "test", + "span-normalizer"), + createCopySpec("raw-spans-grouper", + "raw-spans-grouper", + "test", + "raw-spans-grouper"), + createCopySpec("hypertrace-trace-enricher", + "hypertrace-trace-enricher", + "test", + "hypertrace-trace-enricher"), + createCopySpec("hypertrace-view-generator", + "hypertrace-view-generator", + "test", + "hypertrace-view-generator"), + createCopySpec("hypertrace-metrics-processor", + "hypertrace-metrics-processor", + "test", + "hypertrace-metrics-processor") ).into("./build/resources/test/configs/") } diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index 4ab59dedf..78e6afa58 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -17,6 +17,7 @@ import org.hypertrace.core.serviceframework.config.ConfigUtils; import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; +import org.hypertrace.metrics.processor.MetricsProcessor; import org.hypertrace.traceenricher.trace.enricher.TraceEnricher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,9 @@ private KafkaStreamsApp getSubTopologyInstance(String name) { case "all-views": kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient()); break; + case "hypertrace-metrics-processor": + kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient()); + break; default: throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name)); } diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index 5c9946df6..4cf7b58f0 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -3,7 +3,13 @@ main.class = org.hypertrace.ingester.HypertraceIngester service.name = hypertrace-ingester service.admin.port = 8099 -sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"] +sub.topology.names = [ + "span-normalizer", + "raw-spans-grouper", + "hypertrace-trace-enricher", + "all-views", + "hypertrace-metrics-processor" +] precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} diff --git a/hypertrace-ingester/src/main/resources/log4j2.properties b/hypertrace-ingester/src/main/resources/log4j2.properties new file mode 100644 index 000000000..bdcf9b332 --- /dev/null +++ b/hypertrace-ingester/src/main/resources/log4j2.properties @@ -0,0 +1,29 @@ +status = error +name = PropertiesConfig + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n + +appender.rolling.type = RollingFile +appender.rolling.name = ROLLING_FILE +appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log +appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type = Policies +appender.rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval = 3600 +appender.rolling.policies.time.modulate = true +appender.rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.rolling.policies.size.size = 20MB +appender.rolling.strategy.type = DefaultRolloverStrategy +appender.rolling.strategy.max = 5 + +rootLogger.level = INFO +rootLogger.appenderRef.stdout.ref = STDOUT +rootLogger.appenderRef.rolling.ref = ROLLING_FILE + + + diff --git a/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..870768c84 --- /dev/null +++ b/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects { + group = "org.hypertrace.metrics.processor" +} \ No newline at end of file diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts new file mode 100644 index 000000000..0d318f4d0 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/build.gradle.kts @@ -0,0 +1,44 @@ +plugins { + java + application + jacoco + id("org.hypertrace.docker-java-application-plugin") + id("org.hypertrace.docker-publish-plugin") + id("org.hypertrace.jacoco-report-plugin") +} + +application { + mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") +} + +hypertraceDocker { + defaultImage { + javaApplication { + serviceName.set("${project.name}") + adminPort.set(8099) + } + } +} + +tasks.test { + useJUnitPlatform() +} + +dependencies { + // internal projects + implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api")) + + // frameworks + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26") + implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21") + + // open telemetry proto + implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha") + + // test + testImplementation("org.junit.jupiter:junit-jupiter:5.7.1") + testImplementation("org.mockito:mockito-core:3.8.0") + testImplementation("org.junit-pioneer:junit-pioneer:1.3.8") + testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs") +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java new file mode 100644 index 000000000..32b2a4e69 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsEnricher.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsEnricher + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop enricher for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java new file mode 100644 index 000000000..e16da2518 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsNormalizer.java @@ -0,0 +1,22 @@ +package org.hypertrace.metrics.processor; + +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +public class MetricsNormalizer + implements Transformer> { + + @Override + public void init(ProcessorContext context) {} + + @Override + public KeyValue transform(byte[] key, ResourceMetrics value) { + // noop normalizer for now + return new KeyValue<>(key, value); + } + + @Override + public void close() {} +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java new file mode 100644 index 000000000..b010f76df --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/MetricsProcessor.java @@ -0,0 +1,80 @@ +package org.hypertrace.metrics.processor; + +import com.typesafe.config.Config; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; +import org.hypertrace.core.serviceframework.config.ConfigClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsProcessor extends KafkaStreamsApp { + private static final Logger logger = LoggerFactory.getLogger(MetricsProcessor.class); + public static final String INPUT_TOPIC_CONFIG_KEY = "input.topic"; + public static final String OUTPUT_TOPIC_CONFIG_KEY = "output.topic"; + private static final String METRICS_PROCESSOR_JOB_CONFIG = "metrics-processor-job-config"; + + public MetricsProcessor(ConfigClient configClient) { + super(configClient); + } + + @Override + public StreamsBuilder buildTopology( + Map streamsProperties, + StreamsBuilder streamsBuilder, + Map> inputStreams) { + + Config jobConfig = getJobConfig(streamsProperties); + String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); + String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + + // input stream + KStream inputStream = + (KStream) inputStreams.get(inputTopic); + if (inputStream == null) { + inputStream = + streamsBuilder.stream( + inputTopic, Consumed.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + inputStreams.put(inputTopic, inputStream); + } + + inputStream + .transform(MetricsNormalizer::new) + .transform(MetricsEnricher::new) + .to(outputTopic, Produced.with(Serdes.ByteArray(), new OtlpMetricsSerde())); + + return streamsBuilder; + } + + @Override + public String getJobConfigKey() { + return METRICS_PROCESSOR_JOB_CONFIG; + } + + @Override + public Logger getLogger() { + return logger; + } + + @Override + public List getInputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); + } + + @Override + public List getOutputTopics(Map properties) { + Config jobConfig = getJobConfig(properties); + return List.of(jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY)); + } + + private Config getJobConfig(Map properties) { + return (Config) properties.get(getJobConfigKey()); + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java new file mode 100644 index 000000000..21c82cb75 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/java/org/hypertrace/metrics/processor/OtlpMetricsSerde.java @@ -0,0 +1,45 @@ +package org.hypertrace.metrics.processor; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public class OtlpMetricsSerde implements Serde { + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return new OtlpMetricsSerde.Ser(); + } + + @Override + public Deserializer deserializer() { + return new OtlpMetricsSerde.De(); + } + + public static class Ser implements Serializer { + @Override + public byte[] serialize(String topic, ResourceMetrics data) { + return data.toByteArray(); + } + } + + public static class De implements Deserializer { + @Override + public ResourceMetrics deserialize(String topic, byte[] data) { + try { + return ResourceMetrics.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf new file mode 100644 index 000000000..4af406a7e --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/configs/common/application.conf @@ -0,0 +1,33 @@ +service.name = hypertrace-metrics-processor +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.processor.MetricsProcessor + +input.topic = "otlp-metrics" +output.topic = "enriched-otlp-metrics" + +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-processor-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-processor" + +metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties new file mode 100644 index 000000000..d91bc7bfe --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +status=error +name=PropertiesConfig +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.type=RollingFile +appender.rolling.name=ROLLING_FILE +appender.rolling.fileName=${sys:service.name:-service}.log +appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz +appender.rolling.layout.type=PatternLayout +appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n +appender.rolling.policies.type=Policies +appender.rolling.policies.time.type=TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval=3600 +appender.rolling.policies.time.modulate=true +appender.rolling.policies.size.type=SizeBasedTriggeringPolicy +appender.rolling.policies.size.size=20MB +appender.rolling.strategy.type=DefaultRolloverStrategy +appender.rolling.strategy.max=5 +rootLogger.level=INFO +rootLogger.appenderRef.stdout.ref=STDOUT +rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/java/org/hypertrace/metrics/processor/MetricsProcessorTest.java b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/java/org/hypertrace/metrics/processor/MetricsProcessorTest.java new file mode 100644 index 000000000..f48b38576 --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/java/org/hypertrace/metrics/processor/MetricsProcessorTest.java @@ -0,0 +1,167 @@ +package org.hypertrace.metrics.processor; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.resource.v1.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.hypertrace.core.serviceframework.config.ConfigClientFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; + +public class MetricsProcessorTest { + private MetricsProcessor underTest; + private Config underTestConfig; + + @BeforeEach + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "hypertrace-metrics-processor") + public void setUp() { + underTest = new MetricsProcessor(ConfigClientFactory.getClient()); + underTestConfig = + ConfigFactory.parseURL( + getClass() + .getClassLoader() + .getResource("configs/hypertrace-metrics-processor/application.conf")) + .resolve(); + } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "hypertrace-metrics-processor") + public void testMetricsProcessorTopology() { + // prepare stream properties + Map mergedProps = new HashMap<>(); + underTest.getBaseStreamsConfig().forEach(mergedProps::put); + underTest.getStreamsConfig(underTestConfig).forEach(mergedProps::put); + mergedProps.put(underTest.getJobConfigKey(), underTestConfig); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + // create topology test driver, and i/o topics + TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), props); + + TestInputTopic inputTopic = + topologyTestDriver.createInputTopic( + underTestConfig.getString(MetricsProcessor.INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new OtlpMetricsSerde().serializer()); + + TestOutputTopic outputTopic = + topologyTestDriver.createOutputTopic( + underTestConfig.getString(MetricsProcessor.OUTPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().deserializer(), + new OtlpMetricsSerde().deserializer()); + + // create resource metrics and add into pipeline + ResourceMetrics inputResourceMetrics = getTestMetricsGauge("num_calls", "number of calls", 20L); + inputTopic.pipeInput(inputResourceMetrics); + ResourceMetrics outputResourceMetrics = (ResourceMetrics) outputTopic.readValue(); + + // verification + Assertions.assertNotNull(outputResourceMetrics); + Assertions.assertNotNull(outputResourceMetrics.getResource()); + + Assertions.assertEquals(1, outputResourceMetrics.getInstrumentationLibraryMetricsCount()); + Assertions.assertEquals( + 1, outputResourceMetrics.getInstrumentationLibraryMetrics(0).getMetricsCount()); + + Assertions.assertEquals( + "num_calls", + outputResourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getName()); + + // data points verification + Gauge outGauge = + outputResourceMetrics.getInstrumentationLibraryMetrics(0).getMetrics(0).getGauge(); + Assertions.assertNotNull(outGauge); + Assertions.assertEquals(1, outGauge.getDataPointsCount()); + Assertions.assertEquals( + 1634119810000L, + TimeUnit.MILLISECONDS.convert( + outGauge.getDataPoints(0).getTimeUnixNano(), TimeUnit.NANOSECONDS)); + Assertions.assertEquals(3, outGauge.getDataPoints(0).getAttributesCount()); + Assertions.assertEquals(20L, outGauge.getDataPoints(0).getAsInt()); + } + + private ResourceMetrics getTestMetricsGauge(String metricName, String metricDesc, Long value) { + ResourceMetrics.Builder resourceMetricsBuilder = ResourceMetrics.newBuilder(); + resourceMetricsBuilder.setResource( + Resource.newBuilder() + .addAttributes( + io.opentelemetry.proto.common.v1.KeyValue.newBuilder() + .setKey("Service") + .setValue( + AnyValue.newBuilder() + .setStringValue("hypertrace-metrics-processor") + .build()) + .build())); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metricBuilder = + io.opentelemetry.proto.metrics.v1.Metric.newBuilder(); + metricBuilder.setName(metricName); + metricBuilder.setDescription(metricDesc); + metricBuilder.setUnit("1"); + + NumberDataPoint.Builder numberDataPointBuilder = NumberDataPoint.newBuilder(); + List attributes = + toAttributes( + Map.of( + "tenant_id", "__default", + "service_id", "1234", + "api_id", "4567")); + numberDataPointBuilder.addAllAttributes(attributes); + numberDataPointBuilder.setTimeUnixNano( + TimeUnit.NANOSECONDS.convert( + 1634119810000L /*2021-10-13:10-10-10 GMT*/, TimeUnit.MILLISECONDS)); + numberDataPointBuilder.setAsInt(value); + + Gauge.Builder gaugeBuilder = Gauge.newBuilder(); + gaugeBuilder.addDataPoints(numberDataPointBuilder.build()); + metricBuilder.setGauge(gaugeBuilder.build()); + + resourceMetricsBuilder.addInstrumentationLibraryMetrics( + InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metricBuilder.build()) + .setInstrumentationLibrary( + InstrumentationLibrary.newBuilder().setName("Generated").build()) + .build()); + + return resourceMetricsBuilder.build(); + } + + private List toAttributes(Map labels) { + List attributes = + labels.entrySet().stream() + .map( + k -> { + io.opentelemetry.proto.common.v1.KeyValue.Builder keyValueBuilder = + io.opentelemetry.proto.common.v1.KeyValue.newBuilder(); + keyValueBuilder.setKey(k.getKey()); + String value = k.getValue() != null ? k.getValue() : ""; + keyValueBuilder.setValue(AnyValue.newBuilder().setStringValue(value).build()); + return keyValueBuilder.build(); + }) + .collect(Collectors.toList()); + return attributes; + } +} diff --git a/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/resources/configs/hypertrace-metrics-processor/application.conf b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/resources/configs/hypertrace-metrics-processor/application.conf new file mode 100644 index 000000000..4af406a7e --- /dev/null +++ b/hypertrace-metrics-processor/hypertrace-metrics-processor/src/test/resources/configs/hypertrace-metrics-processor/application.conf @@ -0,0 +1,33 @@ +service.name = hypertrace-metrics-processor +service.admin.port = 8099 + +main.class = org.hypertrace.metrics.processor.MetricsProcessor + +input.topic = "otlp-metrics" +output.topic = "enriched-otlp-metrics" + +precreate.topics = false +precreate.topics = ${?PRE_CREATE_TOPICS} + +kafka.streams.config = { + application.id = metrics-processor-job + num.stream.threads = 2 + num.stream.threads = ${?NUM_STREAM_THREADS} + + bootstrap.servers = "localhost:9092" + bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} + + schema.registry.url = "http://localhost:8081" + schema.registry.url = ${?SCHEMA_REGISTRY_URL} +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} +} + +logger.names = ["file"] +logger.file.dir = "/var/logs/metrics-processor" + +metrics.reporter.prefix = org.hypertrace.metrics.processor.MetricsProcessor +metrics.reporter.names = ["prometheus"] +metrics.reportInterval = 60 diff --git a/settings.gradle.kts b/settings.gradle.kts index 2ca1d9e44..571660d40 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -34,6 +34,11 @@ include("span-normalizer:span-normalizer") include("span-normalizer:raw-span-constants") include("span-normalizer:span-normalizer-constants") -// e2e pipeline -include("hypertrace-ingester") +// metrics pipeline +include("hypertrace-metrics-processor:hypertrace-metrics-processor") + +// utils include("semantic-convention-utils") + +// e2e pipeline +include("hypertrace-ingester") \ No newline at end of file