Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. In this case
* {@code metadata} will be null.
* also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
* and thus will never be ACK'd by a broker.
* In both cases {@code metadata} will be null.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
Expand All @@ -128,8 +129,8 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
* not necessary to implement both methods.
* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
Expand Down Expand Up @@ -364,9 +365,18 @@ private boolean sendRecords() {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
log.trace("Ignoring failed record send: {} failed to send record to {}: ",
WorkerSourceTask.this, topic, e);
// executeFailed here allows the use of existing logging infrastructure/configuration
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
preTransformRecord, e);
commitTaskRecord(preTransformRecord, null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a debug/trace log in this path?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it was suggested to have the tolerance operator handle via the logging report. I would personally find it useful to have it in the connect log regardless of tolerance error logging configuration. I've moved the error/debug log lines to above the tolerance check to log in all instances.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be logging at ERROR level for every single record if we aren't failing the task unless the user has explicitly enabled this by setting errors.log.enable to true in their connector config.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the existing trace and error log lines in the else block.
My suggestion is to add a line at the debug or trace level in the if block so users can know if an error is ignored.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My misunderstanding, thank you both for the feedback. Update made.

} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
}
} else {
submittedRecord.ack();
counter.completeRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingCl
return errantRecordFuture;
}

public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
SourceRecord sourceRecord,
Throwable error) {

markAsFailed();
context.sourceRecord(sourceRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
Future<Void> errantRecordFuture = context.report();
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
}
return errantRecordFuture;
}

/**
* Execute the recoverable operation. If the operation is already in a failed state, then simply return
* with the existing failure.
Expand Down Expand Up @@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
}
}

// For source connectors that want to skip kafka producer errors.
// They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
// to write to kafka.
public ToleranceType getErrorToleranceType() {
return errorToleranceType;
}

// Visible for testing
boolean checkRetry(long startTime) {
return (time.milliseconds() - startTime) < errorRetryTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
Expand Down Expand Up @@ -219,18 +220,32 @@ public void tearDown() {
}

private void createWorkerTask() {
createWorkerTask(TargetState.STARTED);
createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
}

private void createWorkerTaskWithErrorToleration() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the createWorkerTask() method just below by passing a RetryWithToleranceOperator argument instead of creating the WorkerSourceTask object here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I have refactored the constructors to be cleaner with various parameter lists.

createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.ALL_OPERATOR);
}

private void createWorkerTask(TargetState initialState) {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
createWorkerTask(initialState, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
}

private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, retryWithToleranceOperator);
}

private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter) {
createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
}

private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run);
retryWithToleranceOperator, statusBackingStore, Runnable::run);
}

@Test
Expand Down Expand Up @@ -815,6 +830,32 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception {
PowerMock.verifyAll();
}

@Test
public void testSourceTaskIgnoresProducerException() throws Exception {
createWorkerTaskWithErrorToleration();
expectTopicCreation(TOPIC);

// send two records
// record 1 will succeed
// record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
// and no ConnectException will be thrown
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);


expectSendRecordOnce();
expectSendRecordProducerCallbackFail();
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
EasyMock.expectLastCall();

PowerMock.replayAll();

Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
Whitebox.invokeMethod(workerTask, "sendRecords");

PowerMock.verifyAll();
}

@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class RetryWithToleranceOperatorTest {

public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(
ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(
ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
static {
Map<String, String> properties = new HashMap<>();
properties.put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
Expand All @@ -92,6 +94,11 @@ public class RetryWithToleranceOperatorTest {
new ConnectorTaskId("noop-connector", -1),
new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
);
ALL_OPERATOR.metrics(new ErrorHandlingMetrics(
new ConnectorTaskId("errors-all-tolerate-connector", -1),
new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(properties),
new SystemTime(), "test-cluster"))
);
}

@SuppressWarnings("unused")
Expand Down