Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
Expand Down Expand Up @@ -53,6 +54,7 @@
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
Expand Down Expand Up @@ -136,6 +138,7 @@ public class TopologyConfig extends AbstractConfig {
public final Class<?> dslStoreSuppliers;
public final Supplier<TimestampExtractor> timestampExtractorSupplier;
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;

public TopologyConfig(final StreamsConfig globalAppConfigs) {
this(null, globalAppConfigs, new Properties());
Expand All @@ -151,6 +154,7 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo

this.applicationConfigs = globalAppConfigs;
this.topologyOverrides = topologyOverrides;
this.processingExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use a supplier here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Streams creates a processing exception handler per task.
See

processingExceptionHandlerSupplier.get(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but line 288 could just be

globalAppConfigs.getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class

Not sure why we need the supplier indirection?

For the existing timestampExtractorSupplier and deserializationExceptionHandlerSupplier is a little different because we setup different suppliers for different cases (so using suppliers simplifies the code), but for the new one we have only one supplier and thus it seem unnecessary.

No need to change anything. We follow an established pattern, but it just was confusing to me, as it seems (strictly speaking) unnecessary to have the layer of indirection.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processingExceptionHandler has been created following the deserializationExceptionHandler. Thus a supplier has been used.

The supplier can be removed to make the code clearer if needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Understood. -- I think it's not necessary to have a supplier, but also ok to keep the code as-is.


if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) {
maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
Expand Down Expand Up @@ -281,6 +285,7 @@ public TaskConfig getTaskConfig() {
maxBufferedSize,
timestampExtractorSupplier.get(),
deserializationExceptionHandlerSupplier.get(),
processingExceptionHandlerSupplier.get(),
eosEnabled
);
}
Expand All @@ -291,19 +296,22 @@ public static class TaskConfig {
public final int maxBufferedSize;
public final TimestampExtractor timestampExtractor;
public final DeserializationExceptionHandler deserializationExceptionHandler;
public final ProcessingExceptionHandler processingExceptionHandler;
public final boolean eosEnabled;

private TaskConfig(final long maxTaskIdleMs,
final long taskTimeoutMs,
final int maxBufferedSize,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessingExceptionHandler processingExceptionHandler,
final boolean eosEnabled) {
this.maxTaskIdleMs = maxTaskIdleMs;
this.taskTimeoutMs = taskTimeoutMs;
this.maxBufferedSize = maxBufferedSize;
this.timestampExtractor = timestampExtractor;
this.deserializationExceptionHandler = deserializationExceptionHandler;
this.processingExceptionHandler = processingExceptionHandler;
this.eosEnabled = eosEnabled;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.processor.TaskId;

/**
* Default implementation of {@link ErrorHandlerContext} that provides access to the metadata of the record that caused the error.
*/
public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final String topic;
private final int partition;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors.internals;

import org.apache.kafka.common.KafkaException;

/**
* {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have
* occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing.
*/
public class FailedProcessingException extends KafkaException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend StreamsException instead of not KafkaException?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Agreed!

private static final long serialVersionUID = 1L;

public FailedProcessingException(final Throwable throwable) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to catch Throwable? From my understanding, it's (1) not best practice, and (2) we only hand (correctly IMHO) Exception into the new handler, so if we would really have a Throwable at hand, we could not pass it into the handler anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that! Agreed!

super(throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class CorruptedRecord extends StampedRecord {

CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
super(rawRecord, ConsumerRecord.NO_TIMESTAMP);
super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
record.offset(),
record.partition(),
record.topic(),
record.headers());
record.headers(),
record);
globalProcessorContext.setRecordContext(recordContext);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
deserialized.offset(),
deserialized.partition(),
deserialized.topic(),
deserialized.headers());
deserialized.headers(),
record);
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
final Record<Object, Object> toProcess = new Record<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers()
record.headers(),
processorRecordContext.rawRecord()
));
delegate.process(record.key(), record.value());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers());
record.headers(),
recordContext.rawRecord());
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,36 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;

public class ProcessorNode<KIn, VIn, KOut, VOut> {

private final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
private final List<ProcessorNode<KOut, VOut, ?, ?>> children;
private final Map<String, ProcessorNode<KOut, VOut, ?, ?>> childByName;

Expand All @@ -40,12 +54,15 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
private final String name;

public final Set<String> stateStores;
private ProcessingExceptionHandler processingExceptionHandler;

private InternalProcessorContext<KOut, VOut> internalProcessorContext;
private String threadId;

private boolean closed = true;

private Sensor droppedRecordsSensor;

public ProcessorNode(final String name) {
this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
}
Expand Down Expand Up @@ -98,6 +115,10 @@ public void init(final InternalProcessorContext<KOut, VOut> context) {
try {
threadId = Thread.currentThread().getName();
internalProcessorContext = context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId,
internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics());

if (processor != null) {
processor.init(context);
}
Expand All @@ -115,6 +136,11 @@ public void init(final InternalProcessorContext<KOut, VOut> context) {
closed = false;
}

public void init(final InternalProcessorContext<KOut, VOut> context, final ProcessingExceptionHandler processingExceptionHandler) {
init(context);
this.processingExceptionHandler = processingExceptionHandler;
}

public void close() {
throwIfClosed();

Expand Down Expand Up @@ -174,6 +200,32 @@ public void process(final Record<KIn, VIn> record) {
keyClass,
valueClass),
e);
} catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) {
// Rethrow exceptions that should not be handled here
throw e;
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());

final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler
.handle(errorHandlerContext, record, e);

if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
throw new FailedProcessingException(e);
} else {
droppedRecordsSensor.record();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log here? How do we do this elsewhere? I know that the default handlers log already (so might be redundant), but a custom handler might not log...? Just curious.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about logging here. But since we had quite some issues with log pressure due to dropped records, I thought, it would be better to let the handler log.

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -37,17 +38,28 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private final ConsumerRecord<byte[], byte[]> rawRecord;

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers) {
this(timestamp, offset, partition, topic, headers, null);
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final ConsumerRecord<byte[], byte[]> rawRecord) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.rawRecord = rawRecord;
}

@Override
Expand Down Expand Up @@ -75,6 +87,10 @@ public Headers headers() {
return headers;
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -173,7 +189,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
headers = new RecordHeaders(headerArr);
}

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void updateHead() {
droppedRecordsSensor.record();
continue;
}
headRecord = new StampedRecord(deserialized, timestamp);
headRecord = new StampedRecord(deserialized, timestamp, raw);
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers()
record.headers(),
context.recordContext().rawRecord()
);

final String topic = topicExtractor.extract(key, value, contextForExtraction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.kafka.common.header.Headers;

public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
private final ConsumerRecord<byte[], byte[]> rawRecord;

public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp, final ConsumerRecord<byte[], byte[]> rawRecord) {
super(record, timestamp);
this.rawRecord = rawRecord;
}

public String topic() {
Expand All @@ -49,6 +51,20 @@ public Headers headers() {
return value.headers();
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

@Override
public boolean equals(final Object other) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add this overload? (Same for hashCode() below). -- Seems unnecessary.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already had that. See #16093 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks. "Interesting" -- or should I rather say "annoying" 😁

return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public String toString() {
return value.toString() + ", timestamp = " + timestamp;
Expand Down
Loading