Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.apache.kafka.shaded.io.opentelemetry.proto.common.v1.KeyValue;
import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.kafka.streams.ClientInstanceIds;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand Down Expand Up @@ -82,6 +83,8 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CONSUMER_INSTANCE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PRODUCER_INSTANCE_ID_TAG;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -163,6 +166,10 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
&& !entry.getKey().endsWith("GlobalStreamThread-global-consumer"))
.map(Map.Entry::getValue)
.findFirst().orElseThrow();
final Uuid producerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry -> entry.getKey().endsWith("-producer"))
.map(Map.Entry::getValue)
.findFirst().orElseThrow();
assertNotNull(adminInstanceId);
assertNotNull(mainConsumerInstanceId);
LOG.info("Main consumer instance id {}", mainConsumerInstanceId);
Expand Down Expand Up @@ -202,6 +209,8 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
"Never received the process id");

assertEquals(expectedProcessId, TelemetryPlugin.processId);
assertEquals(TelemetryPlugin.consumerInstanceId, mainConsumerInstanceId.toString());
assertEquals(TelemetryPlugin.producerInstanceId, producerInstanceId.toString());
}
}

Expand Down Expand Up @@ -474,6 +483,8 @@ public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter,

public static final Map<Uuid, List<String>> SUBSCRIBED_METRICS = new ConcurrentHashMap<>();
public static String processId;
public static String consumerInstanceId;
public static String producerInstanceId;
public TelemetryPlugin() {
}

Expand Down Expand Up @@ -520,8 +531,22 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client
.map(keyValue -> keyValue.getValue().getStringValue())
.findFirst();

final Map<String, String> instanceIds = data.getResourceMetricsList()
.stream()
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
.map(metric -> metric.getGauge())
.flatMap(gauge -> gauge.getDataPointsList().stream())
.flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream())
.filter(keyValue -> keyValue.getKey().equals(CONSUMER_INSTANCE_ID_TAG) || keyValue.getKey().equals(PRODUCER_INSTANCE_ID_TAG))
.collect(Collectors.toMap(KeyValue::getKey, keyValue -> keyValue.getValue().getStringValue()));

consumerInstanceId = instanceIds.get(CONSUMER_INSTANCE_ID_TAG);
producerInstanceId = instanceIds.get(PRODUCER_INSTANCE_ID_TAG);

processIdOption.ifPresent(pid -> processId = pid);


final Uuid clientId = payload.clientInstanceId();
final List<String> metricNames = data.getResourceMetricsList()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.ClientInstanceIds;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
Expand Down Expand Up @@ -73,10 +74,13 @@
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
Expand Down Expand Up @@ -600,6 +604,7 @@ public StreamThread(final Time time,
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;
this.time = time;

// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
Expand All @@ -608,14 +613,30 @@ public StreamThread(final Time time,
// tasks would never be added to the metrics.
ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
final Map<String, KafkaFuture<Uuid>> clientInstanceIds;
String consumerInstanceId = "";
String producerInstanceId = "";
synchronized (this) {
clientInstanceIds = clientInstanceIds(Duration.ofSeconds(10));
}
try {
consumerInstanceId = clientInstanceIds.get(getName() + "-consumer").get(30, TimeUnit.SECONDS).toString();
producerInstanceId = clientInstanceIds.get(getName() + "-producer").get(30, TimeUnit.SECONDS).toString();
} catch (java.util.concurrent.TimeoutException | ExecutionException e) {
throw new RuntimeException("Failed to get consumer instance id", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

ThreadMetrics.addThreadStartTimeMetric(
threadId,
streamsMetrics,
time.milliseconds()
);
ThreadMetrics.addThreadStateTelemetryMetric(
ThreadMetrics.addThreadStateTelemetryMetricWithInstanceIds(
threadId,
consumerInstanceId,
producerInstanceId,
streamsMetrics,
(metricConfig, now) -> this.state().ordinal());
ThreadMetrics.addThreadStateMetric(
Expand All @@ -632,7 +653,7 @@ public StreamThread(final Time time,
streamsMetrics
);

this.time = time;

this.topologyMetadata = topologyMetadata;
this.topologyMetadata.registerThread(getName());
this.logPrefix = logContext.logPrefix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public int hashCode() {
public static final String CLIENT_ID_TAG = "client-id";
public static final String PROCESS_ID_TAG = "process-id";
public static final String THREAD_ID_TAG = "thread-id";
public static final String CONSUMER_INSTANCE_ID_TAG = "consumer-instance-id";
public static final String PRODUCER_INSTANCE_ID_TAG = "producer-instance-id";
public static final String TASK_ID_TAG = "task-id";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
public static final String TOPIC_NAME_TAG = "topic";
Expand Down Expand Up @@ -242,6 +244,23 @@ public <T> void addThreadLevelMutableMetric(final String name,
}
}

public <T> void addThreadLevelMutableMetricWithInstanceIds(final String name,
final String description,
final String threadId,
final String consumerInstanceId,
final String producerInstanceId,
final Gauge<T> valueProvider) {
final MetricName metricName = metrics.metricName(
name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId, consumerInstanceId, producerInstanceId));
synchronized (threadLevelMetrics) {
threadLevelMetrics.computeIfAbsent(
threadSensorPrefix(threadId),
tid -> new LinkedList<>()
).add(metricName);
metrics.addMetric(metricName, valueProvider);
}
}

public final Sensor clientLevelSensor(final String sensorName,
final RecordingLevel recordingLevel,
final Sensor... parents) {
Expand Down Expand Up @@ -283,6 +302,16 @@ public Map<String, String> threadLevelTagMap(final String threadId) {
return tagMap;
}

public Map<String, String> threadLevelTagMap(final String threadId,
final String consumerInstanceId,
final String producerInstanceId) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);
tagMap.put(CONSUMER_INSTANCE_ID_TAG, consumerInstanceId);
tagMap.put(PRODUCER_INSTANCE_ID_TAG, producerInstanceId);
return tagMap;
}

public final void removeAllClientLevelSensorsAndMetrics() {
removeAllClientLevelSensors();
removeAllClientLevelMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,21 @@ public static void addThreadStateTelemetryMetric(final String threadId,
);
}

public static void addThreadStateTelemetryMetricWithInstanceIds(final String threadId,
final String consumerInstanceId,
final String processorInstanceId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Integer> threadStateProvider) {
streamsMetrics.addThreadLevelMutableMetricWithInstanceIds(
THREAD_STATE,
THREAD_STATE_DESCRIPTION,
threadId,
consumerInstanceId,
processorInstanceId,
threadStateProvider
);
}

public static void addThreadStateMetric(final String threadId,
final StreamsMetricsImpl streamsMetrics,
final Gauge<StreamThread.State> threadStateProvider) {
Expand Down