diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 18dbd2fa6d8a6..60d701b2c843e 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.server.telemetry.ClientTelemetry; import org.apache.kafka.server.telemetry.ClientTelemetryPayload; import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; +import org.apache.kafka.shaded.io.opentelemetry.proto.common.v1.KeyValue; import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData; import org.apache.kafka.streams.ClientInstanceIds; import org.apache.kafka.streams.KafkaClientSupplier; @@ -82,6 +83,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CONSUMER_INSTANCE_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PRODUCER_INSTANCE_ID_TAG; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -163,6 +166,10 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except && !entry.getKey().endsWith("GlobalStreamThread-global-consumer")) .map(Map.Entry::getValue) .findFirst().orElseThrow(); + final Uuid producerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream() + .filter(entry -> entry.getKey().endsWith("-producer")) + .map(Map.Entry::getValue) + .findFirst().orElseThrow(); assertNotNull(adminInstanceId); assertNotNull(mainConsumerInstanceId); LOG.info("Main consumer instance id {}", mainConsumerInstanceId); @@ -202,6 +209,8 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except "Never received the process id"); assertEquals(expectedProcessId, TelemetryPlugin.processId); + assertEquals(TelemetryPlugin.consumerInstanceId, mainConsumerInstanceId.toString()); + assertEquals(TelemetryPlugin.producerInstanceId, producerInstanceId.toString()); } } @@ -474,6 +483,8 @@ public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, public static final Map> SUBSCRIBED_METRICS = new ConcurrentHashMap<>(); public static String processId; + public static String consumerInstanceId; + public static String producerInstanceId; public TelemetryPlugin() { } @@ -520,8 +531,22 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client .map(keyValue -> keyValue.getValue().getStringValue()) .findFirst(); + final Map instanceIds = data.getResourceMetricsList() + .stream() + .flatMap(rm -> rm.getScopeMetricsList().stream()) + .flatMap(sm -> sm.getMetricsList().stream()) + .map(metric -> metric.getGauge()) + .flatMap(gauge -> gauge.getDataPointsList().stream()) + .flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream()) + .filter(keyValue -> keyValue.getKey().equals(CONSUMER_INSTANCE_ID_TAG) || keyValue.getKey().equals(PRODUCER_INSTANCE_ID_TAG)) + .collect(Collectors.toMap(KeyValue::getKey, keyValue -> keyValue.getValue().getStringValue())); + + consumerInstanceId = instanceIds.get(CONSUMER_INSTANCE_ID_TAG); + producerInstanceId = instanceIds.get(PRODUCER_INSTANCE_ID_TAG); + processIdOption.ifPresent(pid -> processId = pid); + final Uuid clientId = payload.clientInstanceId(); final List metricNames = data.getResourceMetricsList() .stream() diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index ceb2c403c3ebb..d40be19d27d96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.ClientInstanceIds; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; @@ -73,10 +74,13 @@ import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; @@ -600,6 +604,7 @@ public StreamThread(final Time time, this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; this.cacheResizer = cacheResizer; + this.time = time; // The following sensors are created here but their references are not stored in this object, since within // this object they are not recorded. The sensors are created here so that the stream threads starts with all @@ -608,14 +613,30 @@ public StreamThread(final Time time, // tasks would never be added to the metrics. ThreadMetrics.createTaskSensor(threadId, streamsMetrics); ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); + final Map> clientInstanceIds; + String consumerInstanceId = ""; + String producerInstanceId = ""; + synchronized (this) { + clientInstanceIds = clientInstanceIds(Duration.ofSeconds(10)); + } + try { + consumerInstanceId = clientInstanceIds.get(getName() + "-consumer").get(30, TimeUnit.SECONDS).toString(); + producerInstanceId = clientInstanceIds.get(getName() + "-producer").get(30, TimeUnit.SECONDS).toString(); + } catch (java.util.concurrent.TimeoutException | ExecutionException e) { + throw new RuntimeException("Failed to get consumer instance id", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } ThreadMetrics.addThreadStartTimeMetric( threadId, streamsMetrics, time.milliseconds() ); - ThreadMetrics.addThreadStateTelemetryMetric( + ThreadMetrics.addThreadStateTelemetryMetricWithInstanceIds( threadId, + consumerInstanceId, + producerInstanceId, streamsMetrics, (metricConfig, now) -> this.state().ordinal()); ThreadMetrics.addThreadStateMetric( @@ -632,7 +653,7 @@ public StreamThread(final Time time, streamsMetrics ); - this.time = time; + this.topologyMetadata = topologyMetadata; this.topologyMetadata.registerThread(getName()); this.logPrefix = logContext.logPrefix(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index af693d14bc331..87daead6f6e69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -117,6 +117,8 @@ public int hashCode() { public static final String CLIENT_ID_TAG = "client-id"; public static final String PROCESS_ID_TAG = "process-id"; public static final String THREAD_ID_TAG = "thread-id"; + public static final String CONSUMER_INSTANCE_ID_TAG = "consumer-instance-id"; + public static final String PRODUCER_INSTANCE_ID_TAG = "producer-instance-id"; public static final String TASK_ID_TAG = "task-id"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; public static final String TOPIC_NAME_TAG = "topic"; @@ -242,6 +244,23 @@ public void addThreadLevelMutableMetric(final String name, } } + public void addThreadLevelMutableMetricWithInstanceIds(final String name, + final String description, + final String threadId, + final String consumerInstanceId, + final String producerInstanceId, + final Gauge valueProvider) { + final MetricName metricName = metrics.metricName( + name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId, consumerInstanceId, producerInstanceId)); + synchronized (threadLevelMetrics) { + threadLevelMetrics.computeIfAbsent( + threadSensorPrefix(threadId), + tid -> new LinkedList<>() + ).add(metricName); + metrics.addMetric(metricName, valueProvider); + } + } + public final Sensor clientLevelSensor(final String sensorName, final RecordingLevel recordingLevel, final Sensor... parents) { @@ -283,6 +302,16 @@ public Map threadLevelTagMap(final String threadId) { return tagMap; } + public Map threadLevelTagMap(final String threadId, + final String consumerInstanceId, + final String producerInstanceId) { + final Map tagMap = new LinkedHashMap<>(); + tagMap.put(THREAD_ID_TAG, threadId); + tagMap.put(CONSUMER_INSTANCE_ID_TAG, consumerInstanceId); + tagMap.put(PRODUCER_INSTANCE_ID_TAG, producerInstanceId); + return tagMap; + } + public final void removeAllClientLevelSensorsAndMetrics() { removeAllClientLevelSensors(); removeAllClientLevelMetrics(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index a5ba7894c46ca..5eaaa194c16bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -307,6 +307,21 @@ public static void addThreadStateTelemetryMetric(final String threadId, ); } + public static void addThreadStateTelemetryMetricWithInstanceIds(final String threadId, + final String consumerInstanceId, + final String processorInstanceId, + final StreamsMetricsImpl streamsMetrics, + final Gauge threadStateProvider) { + streamsMetrics.addThreadLevelMutableMetricWithInstanceIds( + THREAD_STATE, + THREAD_STATE_DESCRIPTION, + threadId, + consumerInstanceId, + processorInstanceId, + threadStateProvider + ); + } + public static void addThreadStateMetric(final String threadId, final StreamsMetricsImpl streamsMetrics, final Gauge threadStateProvider) {