Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
Expand All @@ -53,6 +54,7 @@
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
Expand Down Expand Up @@ -144,6 +146,7 @@ public class KafkaStreams implements AutoCloseable {
private final ScheduledExecutorService rocksDBMetricsRecordingService;
private final QueryableStoreProvider queryableStoreProvider;
private final Admin adminClient;
private final StreamsMetricsImpl streamsMetrics;

GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
Expand Down Expand Up @@ -672,6 +675,16 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);
streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamsMetricsImpl is now created at client level and not on thread level anymore.

streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just going out on a limb here... should we also log the topology description here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought, we already do, but I couldn't find any output in a log file that I have locally. I would be in favour of logging the topology because it helps us during on-call. On the other hand, it may pollute the logs. Anyways, could we postpone this discussion to after the release?


Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I add the instance-level metrics.

// re-write the physical topology according to the config
internalTopologyBuilder.rewriteTopology(config);
Expand Down Expand Up @@ -712,11 +725,10 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,
streamsMetrics,
time,
globalThreadId,
delegatingStateRestoreListener,
rocksDBMetricsRecordingTrigger
delegatingStateRestoreListener
);
globalThreadState = globalStreamThread.state();
}
Expand All @@ -734,14 +746,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
adminClient,
processId,
clientId,
metrics,
streamsMetrics,
time,
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
i + 1);
threads[i].setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed anymore since the RocksDBMetricsRecordingTrigger is contained in StreamsMetricsImpl.

threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
Expand Down Expand Up @@ -928,6 +939,7 @@ private boolean close(final long timeoutMs) {

adminClient.close();

streamsMetrics.removeAllClientLevelMetrics();
metrics.close();
setState(State.NOT_RUNNING);
}, "kafka-streams-close-thread");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals.metrics;

import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.Properties;

public class ClientMetrics {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be more like "InstanceMetrics" than "client" metrics, where "client" usually means a Producer or Consumer. In Streams, we sometimes conflate "client" with "thread" because each StreamThread has exactly one Consumer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KIP-444, "client" is used instead of "instance". Also the tag for instance-level is "client-id" whereas for thread it will be "thread-id" with PR #7429. I agree that clients is somehow overloaded, but since Streams is also a client (that uses other clients) it seems consistent to me.

private ClientMetrics() {}

private static final Logger log = LoggerFactory.getLogger(ClientMetrics.class);
private static final String VERSION = "version";
private static final String COMMIT_ID = "commit-id";
private static final String APPLICATION_ID = "application-id";
private static final String TOPOLOGY_DESCRIPTION = "topology-description";
private static final String STATE = "state";
private static final String VERSION_FROM_FILE;
private static final String COMMIT_ID_FROM_FILE;
private static final String DEFAULT_VALUE = "unknown";

static {
final Properties props = new Properties();
try (InputStream resourceStream = ClientMetrics.class.getResourceAsStream(
"/kafka/kafka-streams-version.properties")) {

props.load(resourceStream);
} catch (final Exception exception) {
log.warn("Error while loading kafka-streams-version.properties", exception);
}
VERSION_FROM_FILE = props.getProperty("version", DEFAULT_VALUE).trim();
COMMIT_ID_FROM_FILE = props.getProperty("commitId", DEFAULT_VALUE).trim();
}

private static final String VERSION_DESCRIPTION = "The version of the Kafka Streams client";
private static final String COMMIT_ID_DESCRIPTION = "The version control commit ID of the Kafka Streams client";
private static final String APPLICATION_ID_DESCRIPTION = "The application ID of the Kafka Streams client";
private static final String TOPOLOGY_DESCRIPTION_DESCRIPTION =
"The description of the topology executed in the Kafka Streams client";
private static final String STATE_DESCRIPTION = "The state of the Kafka Streams client";

public static String version() {
return VERSION_FROM_FILE;
}

public static String commitId() {
return COMMIT_ID_FROM_FILE;
}

public static void addVersionMetric(final StreamsMetricsImpl streamsMetrics) {
streamsMetrics.addClientLevelImmutableMetric(
VERSION,
VERSION_DESCRIPTION,
RecordingLevel.INFO,
VERSION_FROM_FILE
);
}

public static void addCommitIdMetric(final StreamsMetricsImpl streamsMetrics) {
streamsMetrics.addClientLevelImmutableMetric(
COMMIT_ID,
COMMIT_ID_DESCRIPTION,
RecordingLevel.INFO,
COMMIT_ID_FROM_FILE
);
}

public static void addApplicationIdMetric(final StreamsMetricsImpl streamsMetrics, final String applicationId) {
streamsMetrics.addClientLevelImmutableMetric(
APPLICATION_ID,
APPLICATION_ID_DESCRIPTION,
RecordingLevel.INFO,
applicationId
);
}

public static void addTopologyDescriptionMetric(final StreamsMetricsImpl streamsMetrics,
final String topologyDescription) {
streamsMetrics.addClientLevelImmutableMetric(
TOPOLOGY_DESCRIPTION,
TOPOLOGY_DESCRIPTION_DESCRIPTION,
RecordingLevel.INFO,
topologyDescription
);
}

public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
final Gauge<State> stateProvider) {
streamsMetrics.addClientLevelMutableMetric(
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);

otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);

valueGetter.init(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
metrics = (StreamsMetricsImpl) context.metrics();
lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);

store = (SessionStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void init(final ProcessorContext context) {
metrics = internalProcessorContext.metrics();

lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
windowStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>>
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
valueGetter.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
valueGetter.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
valueGetter.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
valueGetter.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private class KTableSourceProcessor extends AbstractProcessor<K, V> {
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
if (queryableName != null) {
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Chan
public void init(final ProcessorContext context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(internalProcessorContext.metrics());
skippedRecordsSensor =
ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), internalProcessorContext.metrics());
store = internalProcessorContext.getStateStore(storeBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void init(final ProcessorContext context) {
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;

metrics = internalProcessorContext.metrics();
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(metrics);
skippedRecordsSensor = ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), metrics);
store = internalProcessorContext.getStateStore(storeBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ private Sensors() {}

public static Sensor lateRecordDropSensor(final InternalProcessorContext context) {
final StreamsMetricsImpl metrics = context.metrics();
final String threadId = Thread.currentThread().getName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always right? It would set the threadId as the name of the thread that constructed the sensor, but is this always the StreamThread? Even if it is today, it wouldn't be outrageous to think in the future that another thread might build a task and then pass it to the thread to execute. Such a refactoring would break this logic, but it would be very subtle.

In contrast, the old code here explicitly passed the thread name via the context, so it would never be set incorrectly just by changing the way that the code is invoked. Perhaps we could preserve this property, for example by passing the thread name as a method argument.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked the code path, passing the thread name via GlobalStreamThread / StreamThread -> StreamTask -> ProcessorContextImpl maybe fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei, you are right. This is not general enough. On thread-level -- for instance -- sensors are created by the main thread and then used in the stream thread. Here, instead of passing it through InternalProcessorContext, I would pass it as a separate parameter to lateRecordDropSensor(). According to KIP-444 late-record-drop will be superseded with dropped-records and lateRecordDropSensor() will most probably disappear. Actually, since it is currently not a problem at all because the calling thread is the stream thread, I would leave it as it is.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds fine to me, as long as we're aware of the risk.


final Sensor sensor = metrics.nodeLevelSensor(
threadId,
context.taskId().toString(),
context.currentNode().name(),
LATE_RECORD_DROP,
Expand All @@ -47,23 +50,33 @@ public static Sensor lateRecordDropSensor(final InternalProcessorContext context
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
sensor,
PROCESSOR_NODE_METRICS_GROUP,
metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
metrics.tagMap(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the other PR is merged, this part can be leveraged / rebased, just a reminder :)

threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
context.currentNode().name()
),
LATE_RECORD_DROP
);
return sensor;
}

public static Sensor recordLatenessSensor(final InternalProcessorContext context) {
final StreamsMetricsImpl metrics = context.metrics();
final String threadId = Thread.currentThread().getName();

final Sensor sensor = metrics.taskLevelSensor(
threadId,
context.taskId().toString(),
"record-lateness",
Sensor.RecordingLevel.DEBUG
);

final Map<String, String> tags = metrics.tagMap(
"task-id", context.taskId().toString()
threadId,
"task-id",
context.taskId().toString()
);
sensor.add(
new MetricName(
Expand All @@ -86,17 +99,22 @@ public static Sensor recordLatenessSensor(final InternalProcessorContext context

public static Sensor suppressionEmitSensor(final InternalProcessorContext context) {
final StreamsMetricsImpl metrics = context.metrics();
final String threadId = Thread.currentThread().getName();

final Sensor sensor = metrics.nodeLevelSensor(
threadId,
context.taskId().toString(),
context.currentNode().name(),
"suppression-emit",
Sensor.RecordingLevel.DEBUG
);

final Map<String, String> tags = metrics.tagMap(
"task-id", context.taskId().toString(),
PROCESSOR_NODE_ID_TAG, context.currentNode().name()
threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
context.currentNode().name()
);

sensor.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Map<TopicPartition, Long> initialize() {
source,
deserializationExceptionHandler,
logContext,
ThreadMetrics.skipRecordSensor(processorContext.metrics())
ThreadMetrics.skipRecordSensor(Thread.currentThread().getName(), processorContext.metrics())
)
);
}
Expand Down
Loading