diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 225b080ee18d0..f5209e1ccab64 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -115,8 +115,9 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
/**
*
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
- * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. In this case
- * {@code metadata} will be null.
+ * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
+ * and thus will never be ACK'd by a broker.
+ * In both cases {@code metadata} will be null.
*
*
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
@@ -128,8 +129,8 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
* not necessary to implement both methods.
*
*
- * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
- * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered
+ * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
+ * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a33821a8dde4c..6c75f4a5d9d7d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -39,6 +39,7 @@
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
+import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
@@ -364,9 +365,18 @@ private boolean sendRecords() {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
- log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
- log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
- producerSendException.compareAndSet(null, e);
+ if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) {
+ log.trace("Ignoring failed record send: {} failed to send record to {}: ",
+ WorkerSourceTask.this, topic, e);
+ // executeFailed here allows the use of existing logging infrastructure/configuration
+ retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class,
+ preTransformRecord, e);
+ commitTaskRecord(preTransformRecord, null);
+ } else {
+ log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
+ log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+ producerSendException.compareAndSet(null, e);
+ }
} else {
submittedRecord.ack();
counter.completeRecord();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index ce4c1e27dda09..0e15ced99b922 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -111,6 +111,23 @@ public synchronized Future executeFailed(Stage stage, Class> executingCl
return errantRecordFuture;
}
+ public synchronized Future executeFailed(Stage stage, Class> executingClass,
+ SourceRecord sourceRecord,
+ Throwable error) {
+
+ markAsFailed();
+ context.sourceRecord(sourceRecord);
+ context.currentContext(stage, executingClass);
+ context.error(error);
+ errorHandlingMetrics.recordFailure();
+ Future errantRecordFuture = context.report();
+ if (!withinToleranceLimits()) {
+ errorHandlingMetrics.recordError();
+ throw new ConnectException("Tolerance exceeded in Source Worker error handler", error);
+ }
+ return errantRecordFuture;
+ }
+
/**
* Execute the recoverable operation. If the operation is already in a failed state, then simply return
* with the existing failure.
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
}
}
+ // For source connectors that want to skip kafka producer errors.
+ // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing
+ // to write to kafka.
+ public ToleranceType getErrorToleranceType() {
+ return errorToleranceType;
+ }
+
// Visible for testing
boolean checkRetry(long startTime) {
return (time.milliseconds() - startTime) < errorRetryTimeout;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index fcd657fb2c142..094062432b65b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -42,6 +42,7 @@
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -219,18 +220,32 @@ public void tearDown() {
}
private void createWorkerTask() {
- createWorkerTask(TargetState.STARTED);
+ createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+ }
+
+ private void createWorkerTaskWithErrorToleration() {
+ createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.ALL_OPERATOR);
}
private void createWorkerTask(TargetState initialState) {
- createWorkerTask(initialState, keyConverter, valueConverter, headerConverter);
+ createWorkerTask(initialState, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+ }
+
+ private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) {
+ createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, retryWithToleranceOperator);
}
- private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+ private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
+ HeaderConverter headerConverter) {
+ createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR);
+ }
+
+ private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter,
+ HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
- RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run);
+ retryWithToleranceOperator, statusBackingStore, Runnable::run);
}
@Test
@@ -815,6 +830,32 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception {
PowerMock.verifyAll();
}
+ @Test
+ public void testSourceTaskIgnoresProducerException() throws Exception {
+ createWorkerTaskWithErrorToleration();
+ expectTopicCreation(TOPIC);
+
+ // send two records
+ // record 1 will succeed
+ // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator
+ // and no ConnectException will be thrown
+ SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+ SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+ expectSendRecordOnce();
+ expectSendRecordProducerCallbackFail();
+ sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+ Whitebox.invokeMethod(workerTask, "sendRecords");
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 68f8afcf4c637..0b0b603ed3df4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -78,6 +78,8 @@ public class RetryWithToleranceOperatorTest {
public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(
ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+ public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(
+ ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
static {
Map properties = new HashMap<>();
properties.put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
@@ -92,6 +94,11 @@ public class RetryWithToleranceOperatorTest {
new ConnectorTaskId("noop-connector", -1),
new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
);
+ ALL_OPERATOR.metrics(new ErrorHandlingMetrics(
+ new ConnectorTaskId("errors-all-tolerate-connector", -1),
+ new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(properties),
+ new SystemTime(), "test-cluster"))
+ );
}
@SuppressWarnings("unused")