From 34d93a8e09d2aaa3a83c9d76eb03660c8399ce3d Mon Sep 17 00:00:00 2001 From: Knowles Atchison Jr Date: Fri, 19 Nov 2021 09:07:06 -0500 Subject: [PATCH 1/5] KAFKA-13348 errors.tolerance "all" behavior has been updated to allow source connectors to ignore producer exceptions. The connector will receive null RecordMetadata in the commitRecord callback in lieu of the task failling unconditionally. --- .../kafka/connect/source/SourceTask.java | 9 ++--- .../connect/runtime/WorkerSourceTask.java | 7 +++- .../errors/RetryWithToleranceOperator.java | 7 ++++ .../connect/runtime/WorkerSourceTaskTest.java | 34 +++++++++++++++++++ .../RetryWithToleranceOperatorTest.java | 7 ++++ 5 files changed, 59 insertions(+), 5 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java index 225b080ee18d0..f5209e1ccab64 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java @@ -115,8 +115,9 @@ public void commitRecord(SourceRecord record) throws InterruptedException { /** *

* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is - * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. In this case - * {@code metadata} will be null. + * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all" + * and thus will never be ACK'd by a broker. + * In both cases {@code metadata} will be null. *

*

* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets @@ -128,8 +129,8 @@ public void commitRecord(SourceRecord record) throws InterruptedException { * not necessary to implement both methods. *

* - * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation - * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered + * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception + * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored * @throws InterruptedException */ public void commitRecord(SourceRecord record, RecordMetadata metadata) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a33821a8dde4c..e1372388bf8c8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; @@ -366,7 +367,11 @@ private boolean sendRecords() { if (e != null) { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); - producerSendException.compareAndSet(null, e); + if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) { + commitTaskRecord(preTransformRecord, null); + } else { + producerSendException.compareAndSet(null, e); + } } else { submittedRecord.ack(); counter.completeRecord(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index ce4c1e27dda09..9ef350261f8a2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -229,6 +229,13 @@ public synchronized boolean withinToleranceLimits() { } } + // For source connectors that want to skip kafka producer errors. + // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing + // to write to kafka. + public synchronized ToleranceType getErrorToleranceType() { + return errorToleranceType; + } + // Visible for testing boolean checkRetry(long startTime) { return (time.milliseconds() - startTime) < errorRetryTimeout; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index fcd657fb2c142..3134866480b7e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -222,6 +222,13 @@ private void createWorkerTask() { createWorkerTask(TargetState.STARTED); } + private void createWorkerTaskWithErrorToleration() { + workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, + transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), + offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, + RetryWithToleranceOperatorTest.ALL_OPERATOR, statusBackingStore, Runnable::run); + } + private void createWorkerTask(TargetState initialState) { createWorkerTask(initialState, keyConverter, valueConverter, headerConverter); } @@ -815,6 +822,33 @@ public void testSendRecordsTaskCommitRecordFail() throws Exception { PowerMock.verifyAll(); } + @Test + public void testSourceTaskIgnoresProducerException() throws Exception { + createWorkerTaskWithErrorToleration(); + expectTopicCreation(TOPIC); + + // send two records + // record 1 will succeed + // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator + // and no ConnectException will be thrown + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + + expectSendRecordOnce(); + expectSendRecordProducerCallbackFail(); + sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class)); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); + + PowerMock.verifyAll(); + } + @Test public void testSlowTaskStart() throws Exception { final CountDownLatch startupLatch = new CountDownLatch(1); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index 68f8afcf4c637..0b0b603ed3df4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -78,6 +78,8 @@ public class RetryWithToleranceOperatorTest { public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator( ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM); + public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator( + ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM); static { Map properties = new HashMap<>(); properties.put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2)); @@ -92,6 +94,11 @@ public class RetryWithToleranceOperatorTest { new ConnectorTaskId("noop-connector", -1), new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster")) ); + ALL_OPERATOR.metrics(new ErrorHandlingMetrics( + new ConnectorTaskId("errors-all-tolerate-connector", -1), + new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(properties), + new SystemTime(), "test-cluster")) + ); } @SuppressWarnings("unused") From b956c194164c7a0d30fba441964f60718d10c9a7 Mon Sep 17 00:00:00 2001 From: Knowles Atchison Jr Date: Fri, 19 Nov 2021 14:59:21 -0500 Subject: [PATCH 2/5] KAFKA-13348 Fixed related failing test after lastSendFail was removed. --- .../org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 3134866480b7e..6c099c1b59e3d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -844,7 +844,6 @@ public void testSourceTaskIgnoresProducerException() throws Exception { Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); PowerMock.verifyAll(); } From 22daf3630b8e1ad06b0aad43e2d73938549e0346 Mon Sep 17 00:00:00 2001 From: Knowles Atchison Jr Date: Mon, 6 Dec 2021 09:33:53 -0500 Subject: [PATCH 3/5] KAFKA-13348 Updated error tolerance in WorkerSourceTask to use existing logging infrastructure/configuration. --- .../kafka/connect/runtime/WorkerSourceTask.java | 7 +++++-- .../errors/RetryWithToleranceOperator.java | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index e1372388bf8c8..2853da700424d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -365,11 +365,14 @@ private boolean sendRecords() { producerRecord, (recordMetadata, e) -> { if (e != null) { - log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); - log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) { + // executeFailed here allows the use of existing logging infrastructure/configuration + retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, + preTransformRecord, e); commitTaskRecord(preTransformRecord, null); } else { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 9ef350261f8a2..b90668a7f1a61 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -111,6 +111,23 @@ public synchronized Future executeFailed(Stage stage, Class executingCl return errantRecordFuture; } + public synchronized Future executeFailed(Stage stage, Class executingClass, + SourceRecord sourceRecord, + Throwable error) { + + markAsFailed(); + context.sourceRecord(sourceRecord); + context.currentContext(stage, executingClass); + context.error(error); + errorHandlingMetrics.recordFailure(); + Future errantRecordFuture = context.report(); + if (!withinToleranceLimits()) { + errorHandlingMetrics.recordError(); + throw new ConnectException("Tolerance exceeded in error handler", error); + } + return errantRecordFuture; + } + /** * Execute the recoverable operation. If the operation is already in a failed state, then simply return * with the existing failure. From 878b131a292a96dd467cd9623dab895ace743c3c Mon Sep 17 00:00:00 2001 From: Knowles Atchison Jr Date: Wed, 26 Jan 2022 17:28:19 -0500 Subject: [PATCH 4/5] KAFKA-13348 Added additional connect worker logging. Cleaned up worker task creation in test. Misc. code cleanup. --- .../connect/runtime/WorkerSourceTask.java | 6 ++--- .../errors/RetryWithToleranceOperator.java | 4 +-- .../connect/runtime/WorkerSourceTaskTest.java | 26 ++++++++++++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 2853da700424d..8e7bedb8f0651 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -365,14 +365,14 @@ private boolean sendRecords() { producerRecord, (recordMetadata, e) -> { if (e != null) { - if (retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); + if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { // executeFailed here allows the use of existing logging infrastructure/configuration retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, preTransformRecord, e); commitTaskRecord(preTransformRecord, null); } else { - log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); - log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index b90668a7f1a61..0e15ced99b922 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -123,7 +123,7 @@ public synchronized Future executeFailed(Stage stage, Class executingCl Future errantRecordFuture = context.report(); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); - throw new ConnectException("Tolerance exceeded in error handler", error); + throw new ConnectException("Tolerance exceeded in Source Worker error handler", error); } return errantRecordFuture; } @@ -249,7 +249,7 @@ public synchronized boolean withinToleranceLimits() { // For source connectors that want to skip kafka producer errors. // They cannot use withinToleranceLimits() as no failure may have actually occurred prior to the producer failing // to write to kafka. - public synchronized ToleranceType getErrorToleranceType() { + public ToleranceType getErrorToleranceType() { return errorToleranceType; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 6c099c1b59e3d..094062432b65b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -219,25 +220,32 @@ public void tearDown() { } private void createWorkerTask() { - createWorkerTask(TargetState.STARTED); + createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.NOOP_OPERATOR); } private void createWorkerTaskWithErrorToleration() { - workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, - transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), - offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - RetryWithToleranceOperatorTest.ALL_OPERATOR, statusBackingStore, Runnable::run); + createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.ALL_OPERATOR); } private void createWorkerTask(TargetState initialState) { - createWorkerTask(initialState, keyConverter, valueConverter, headerConverter); + createWorkerTask(initialState, RetryWithToleranceOperatorTest.NOOP_OPERATOR); } - private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { + private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { + createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, retryWithToleranceOperator); + } + + private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, + HeaderConverter headerConverter) { + createWorkerTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR); + } + + private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, + HeaderConverter headerConverter, RetryWithToleranceOperator retryWithToleranceOperator) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run); + retryWithToleranceOperator, statusBackingStore, Runnable::run); } @Test @@ -837,7 +845,7 @@ public void testSourceTaskIgnoresProducerException() throws Exception { expectSendRecordOnce(); expectSendRecordProducerCallbackFail(); - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class)); + sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); EasyMock.expectLastCall(); PowerMock.replayAll(); From ad098eb99ac53033fe94e07bb00b47452abe56b3 Mon Sep 17 00:00:00 2001 From: Knowles Atchison Jr Date: Thu, 27 Jan 2022 06:49:37 -0500 Subject: [PATCH 5/5] KAFKA-13348 Updated logging for error ignore path. --- .../org/apache/kafka/connect/runtime/WorkerSourceTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 8e7bedb8f0651..6c75f4a5d9d7d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -365,14 +365,16 @@ private boolean sendRecords() { producerRecord, (recordMetadata, e) -> { if (e != null) { - log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); - log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { + log.trace("Ignoring failed record send: {} failed to send record to {}: ", + WorkerSourceTask.this, topic, e); // executeFailed here allows the use of existing logging infrastructure/configuration retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, preTransformRecord, e); commitTaskRecord(preTransformRecord, null); } else { + log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); + log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } } else {