Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private <K, V> void forward(final ProcessorNode<K, V> child,
setCurrentNode(child);
child.process(key, value);
if (child.isTerminalNode()) {
streamTask.maybeRecordE2ELatency(timestamp(), child.name());
streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This previously relied on a lookup of the actual current system time. I thought we decided to use the cached system time. Can you set me straight, @ableegoldman ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hm, I thought we decided to push the stateful-node-level metrics to TRACE so we could get the actual time at each node without a (potential) performance hit. But with the INFO-level metrics it would be ok since we're only updating it twice per process.
But maybe I'm misremembering...I suppose ideally we could run some benchmarks for both cases and see if it really makes a difference...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but right now, this is an INFO level metric, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably need to get refactored when you do the second PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'm just not 100% sure we all agreed it was alright to get the actual system time even for the task-level metrics ... so we should probably stick with the cached time for now

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class StandbyTask extends AbstractTask implements Task {
private final Sensor closeTaskSensor;
private final boolean eosEnabled;
private final InternalProcessorContext processorContext;
private final StreamsMetricsImpl streamsMetrics;

private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;

Expand All @@ -52,29 +53,30 @@ public class StandbyTask extends AbstractTask implements Task {
* @param partitions input topic partitions, used for thread metadata only
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamsConfig} specified by the user
* @param metrics the {@link StreamsMetrics} created by the thread
* @param streamsMetrics the {@link StreamsMetrics} created by the thread
* @param stateMgr the {@link ProcessorStateManager} for this task
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
StandbyTask(final TaskId id,
final Set<TopicPartition> partitions,
final ProcessorTopology topology,
final StreamsConfig config,
final StreamsMetricsImpl metrics,
final StreamsMetricsImpl streamsMetrics,
final ProcessorStateManager stateMgr,
final StateDirectory stateDirectory,
final ThreadCache cache,
final InternalProcessorContext processorContext) {
super(id, topology, stateDirectory, stateMgr, partitions);
this.processorContext = processorContext;
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);

final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", id);
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
eosEnabled = StreamThread.eosEnabled(config);
}

Expand Down Expand Up @@ -174,18 +176,21 @@ public void postCommit() {

@Override
public void closeClean() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standby tasks don't currently register any sensors, but I personally rather to be defensive and idempotently ensure we remove any sensors while closing.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. But why do it both here and in closeDirty vs doing so in close(clean)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to inline close(boolean), but am resisting the urge... This is a compromise ;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I thought the answer might be something like that...

close(true);
log.info("Closed clean");
}

@Override
public void closeDirty() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(false);
log.info("Closed dirty");
}

@Override
public void closeAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
stateMgr.recycle();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
Expand Down Expand Up @@ -89,6 +88,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final Map<TopicPartition, Long> consumedOffsets;
private final PunctuationQueue streamTimePunctuationQueue;
private final PunctuationQueue systemTimePunctuationQueue;
private final StreamsMetricsImpl streamsMetrics;

private long processTimeMs = 0L;

Expand Down Expand Up @@ -135,6 +135,7 @@ public StreamTask(final TaskId id,
eosEnabled = StreamThread.eosEnabled(config);

final String threadId = Thread.currentThread().getName();
this.streamsMetrics = streamsMetrics;
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
final String taskId = id.toString();
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
Expand All @@ -148,18 +149,18 @@ public StreamTask(final TaskId id,
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);

for (final String terminalNode : topology.terminalNodes()) {
for (final String terminalNodeName : topology.terminalNodes()) {
e2eLatencySensors.put(
terminalNode,
ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics)
terminalNodeName,
TaskMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, RecordingLevel.INFO, streamsMetrics)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes the sensor leak by simply registering these as task-level sensors. Note the node name is still provided to scope the sensors themselves.

);
}

for (final ProcessorNode<?, ?> sourceNode : topology.sources()) {
final String processorId = sourceNode.name();
final String sourceNodeName = sourceNode.name();
e2eLatencySensors.put(
processorId,
ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics)
sourceNodeName,
TaskMetrics.e2ELatencySensor(threadId, taskId, sourceNodeName, RecordingLevel.INFO, streamsMetrics)
);
}

Expand Down Expand Up @@ -462,12 +463,14 @@ private Map<TopicPartition, Long> extractPartitionTimes() {

@Override
public void closeClean() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We previously relied on the task manager to remove these sensors before calling close, but forgot to do it before recycling. In retrospect, it's better to do it within the same class that creates the sensors to begin with.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should clean up anything we created in the same class

close(true);
log.info("Closed clean");
}

@Override
public void closeDirty() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(false);
log.info("Closed dirty");
}
Expand All @@ -480,6 +483,7 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,

@Override
public void closeAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
switch (state()) {
case SUSPENDED:
stateMgr.recycle();
Expand Down Expand Up @@ -917,11 +921,7 @@ public boolean maybePunctuateSystemTime() {
return punctuated;
}

void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) {
maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
}

private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
if (e2eLatencySensor == null) {
throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,15 +671,9 @@ private void completeTaskCloseClean(final Task task) {

// Note: this MUST be called *before* actually closing the task
private void cleanupTask(final Task task) {
// 1. remove the input partitions from the materialized map;
// 2. remove the task metrics from the metrics registry

for (final TopicPartition inputPartition : task.inputPartitions()) {
partitionToTask.remove(inputPartition);
}

final String threadId = Thread.currentThread().getName();
streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString());
}

void shutdown(final boolean clean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor;

public class ProcessorNodeMetrics {
private ProcessorNodeMetrics() {}
Expand Down Expand Up @@ -99,15 +98,6 @@ private ProcessorNodeMetrics() {}
private static final String LATE_RECORD_DROP_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + LATE_RECORD_DROP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
"end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_P99_DESCRIPTION = "The 99th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_P90_DESCRIPTION = "The 90th percentile " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;

public static Sensor suppressionEmitSensor(final String threadId,
final String taskId,
final String processorNodeId,
Expand Down Expand Up @@ -299,26 +289,6 @@ public static Sensor processorAtSourceSensorOrForwardSensor(final String threadI
return processAtSourceSensor(threadId, taskId, processorNodeId, streamsMetrics);
}

public static Sensor recordE2ELatencySensor(final String threadId,
final String taskId,
final String processorNodeId,
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, RECORD_E2E_LATENCY, recordingLevel);
final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
addMinAndMaxAndP99AndP90ToSensor(
sensor,
PROCESSOR_NODE_LEVEL_GROUP,
tagMap,
RECORD_E2E_LATENCY,
RECORD_E2E_LATENCY_MIN_DESCRIPTION,
RECORD_E2E_LATENCY_MAX_DESCRIPTION,
RECORD_E2E_LATENCY_P99_DESCRIPTION,
RECORD_E2E_LATENCY_P90_DESCRIPTION
);
return sensor;
}

private static Sensor throughputAndLatencySensorWithParent(final String threadId,
final String taskId,
final String processorNodeId,
Expand All @@ -337,7 +307,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId
descriptionOfCount,
descriptionOfAvgLatency,
descriptionOfMaxLatency,
RecordingLevel.DEBUG,
recordingLevel,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We erroneously ignored the provided recordingLevel and set them to debug. It didn't manifest because this method happens to always be called with a recordingLevel of debug anyway.

streamsMetrics
);
return throughputAndLatencySensor(
Expand All @@ -349,7 +319,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId
descriptionOfCount,
descriptionOfAvgLatency,
descriptionOfMaxLatency,
RecordingLevel.DEBUG,
recordingLevel,
streamsMetrics,
parentSensor
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
Expand All @@ -47,9 +44,9 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.Optional;

public class StreamsMetricsImpl implements StreamsMetrics {

Expand Down Expand Up @@ -154,9 +151,6 @@ public int hashCode() {
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";

private static final int PERCENTILES_SIZE_IN_BYTES = 100 * 1000; // 100 kB
private static double MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000d; // maximum latency is 10 days; values above that will be pinned

public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String builtInMetricsVersion) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null");
Expand Down Expand Up @@ -650,14 +644,12 @@ public static void addAvgAndMaxToSensor(final Sensor sensor,
);
}

public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final String descriptionOfMin,
final String descriptionOfMax,
final String descriptionOfP99,
final String descriptionOfP90) {
public static void addMinAndMaxToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final String descriptionOfMin,
final String descriptionOfMax) {
sensor.add(
new MetricName(
operation + MIN_SUFFIX,
Expand All @@ -675,27 +667,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor,
tags),
new Max()
);

sensor.add(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped the percentiles metric.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github won't let me comment on these lines, but we should remove the two percentiles-necessitated constants above (PERCENTILES_SIZE_IN_BYTES and MAXIMUM_E2E_LATENCY)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed those. Thanks!

new Percentiles(
PERCENTILES_SIZE_IN_BYTES,
MAXIMUM_E2E_LATENCY,
BucketSizing.LINEAR,
new Percentile(
new MetricName(
operation + P99_SUFFIX,
group,
descriptionOfP99,
tags),
99),
new Percentile(
new MetricName(
operation + P90_SUFFIX,
group,
descriptionOfP90,
tags),
90))
);
}

public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;

public class TaskMetrics {
Expand Down Expand Up @@ -86,6 +88,13 @@ private TaskMetrics() {}
private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
"from consumer and not yet processed for this active task";

private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
"end-to-end latency of a record, measuring by comparing the record timestamp with the "
+ "system time when it has been fully processed by the node";
private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;
private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX;

public static Sensor processLatencySensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
Expand Down Expand Up @@ -133,6 +142,25 @@ public static Sensor activeBufferedRecordsSensor(final String threadId,
return sensor;
}

public static Sensor e2ELatencySensor(final String threadId,
final String taskId,
final String processorNodeId,
final RecordingLevel recordingLevel,
final StreamsMetricsImpl streamsMetrics) {
final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName, recordingLevel);
final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
addMinAndMaxToSensor(
sensor,
PROCESSOR_NODE_LEVEL_GROUP,
Comment on lines +152 to +155
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I'm not familiar enough with the metrics classification to know if this will be an issue or just an oddity, but we now have allegedly task-level metrics but with the processor-node-level tags/grouping. It's kind of a "task metric in implementation, processor node metric in interface" -- might be confusing for us but should be alright for users, yeah?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We give it the task sensor prefix which becomes part of the full sensor name, rather than the processor node prefix

tagMap,
RECORD_E2E_LATENCY,
RECORD_E2E_LATENCY_MIN_DESCRIPTION,
RECORD_E2E_LATENCY_MAX_DESCRIPTION
);
return sensor;
}

public static Sensor punctuateSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
Expand Down
Loading