From d9972ceb055472652d6c2bd1b072a049f6383ec3 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 19 Oct 2021 16:54:01 +0800 Subject: [PATCH] KAFKA-13370: add errors when commit offsets failed and add tests --- .../connect/runtime/WorkerSourceTask.java | 18 +- .../connect/runtime/WorkerSourceTaskTest.java | 176 +++++++++++++----- 2 files changed, 141 insertions(+), 53 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 7307ec6a4e272..e7f38ed90d74c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -514,9 +514,11 @@ public boolean commitOffsets() { // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, // we can stop flushing immediately if (isCancelled() || timeoutMs <= 0) { - log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); + log.error("{} Failed to flush, task is cancelled, or timed out while waiting for producer " + + "to flush outstanding {} messages", this, outstandingMessages.size()); finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started, + new TimeoutException("Task is cancelled or timed out while waiting for flushing messages")); return false; } this.wait(timeoutMs); @@ -526,7 +528,7 @@ public boolean commitOffsets() { // to stop immediately log.error("{} Interrupted while flushing messages, offsets will not be committed", this); finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started, e); return false; } } @@ -550,16 +552,16 @@ public boolean commitOffsets() { // Now we can actually flush the offsets to user storage. Future flushFuture = offsetWriter.doFlush((error, result) -> { if (error != null) { + // Very rare case: offsets were unserializable, and unable to store any data log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error); + finishFailedFlush(); + recordCommitFailure(time.milliseconds() - started, error); } else { log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this); } }); - // Very rare case: offsets were unserializable and we finished immediately, unable to store - // any data + // failed on doFlush, return false immediately if (flushFuture == null) { - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); return false; } try { @@ -581,7 +583,7 @@ public boolean commitOffsets() { } catch (TimeoutException e) { log.error("{} Timed out waiting to flush offsets to storage", this); finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); + recordCommitFailure(time.milliseconds() - started, e); return false; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 640c1d8259e20..5834c26bb85e5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -129,6 +129,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { // is used in the right place. private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); + private static final long STOP_TIME_OUT = 10000; private ExecutorService executor = Executors.newSingleThreadExecutor(); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); @@ -257,7 +258,7 @@ public void testStartPaused() throws Exception { assertTrue(pauseLatch.await(5, TimeUnit.SECONDS)); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); @@ -308,7 +309,7 @@ public void testPause() throws Exception { assertTrue(count.get() - priorCount <= 1); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); @@ -347,10 +348,10 @@ public void testPollsInBackground() throws Exception { assertTrue(awaitLatch(pollLatch)); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(10); + assertPollMetrics(10, true, true); PowerMock.verifyAll(); } @@ -389,10 +390,10 @@ public void testFailureInPoll() throws Exception { assertTrue(awaitLatch(pollLatch)); //Failure in poll should trigger automatic stop of the worker - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(0); + assertPollMetrics(0, true, true); PowerMock.verifyAll(); } @@ -437,10 +438,10 @@ public void testFailureInPollAfterCancel() throws Exception { assertTrue(awaitLatch(pollLatch)); workerTask.cancel(); workerCancelLatch.countDown(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(0); + assertPollMetrics(0, true, true); PowerMock.verifyAll(); } @@ -482,10 +483,10 @@ public void testFailureInPollAfterStop() throws Exception { assertTrue(awaitLatch(pollLatch)); workerTask.stop(); workerStopLatch.countDown(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(0); + assertPollMetrics(0, true, true); PowerMock.verifyAll(); } @@ -523,10 +524,10 @@ public void testPollReturnsNoRecords() throws Exception { assertTrue(awaitLatch(pollLatch)); assertTrue(workerTask.commitOffsets()); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(0); + assertPollMetrics(0, true, true); PowerMock.verifyAll(); } @@ -566,10 +567,10 @@ public void testCommit() throws Exception { assertTrue(awaitLatch(pollLatch)); assertTrue(workerTask.commitOffsets()); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(1); + assertPollMetrics(1, true, true); PowerMock.verifyAll(); } @@ -588,7 +589,7 @@ public void testCommitFailure() throws Exception { // We'll wait for some data, then trigger a flush final CountDownLatch pollLatch = expectPolls(1); - expectOffsetFlush(true); + expectOffsetFlush(false); expectTopicCreation(TOPIC); @@ -607,12 +608,67 @@ public void testCommitFailure() throws Exception { Future taskFuture = executor.submit(workerTask); assertTrue(awaitLatch(pollLatch)); - assertTrue(workerTask.commitOffsets()); + // the commitOffsets will return false since flushFuture will have exception + assertFalse(workerTask.commitOffsets()); workerTask.stop(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); taskFuture.get(); - assertPollMetrics(1); + assertPollMetrics(1, false, true); + + PowerMock.verifyAll(); + } + + @SuppressWarnings("unchecked") + @Test + public void testCommitWithOutStandingMsgTimeoutFailure() throws Exception { + // Test that the task commits properly when prompted + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + // We'll wait for some data, then trigger a flush + final CountDownLatch produceLatch = new CountDownLatch(1); + final CountDownLatch pollLatch = expectPolls(1, new AtomicInteger(0), produceLatch); + + EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); + offsetWriter.cancelFlush(); + PowerMock.expectLastCall(); + + expectTopicCreation(TOPIC); + + sourceTask.stop(); + EasyMock.expectLastCall(); + + EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); + offsetWriter.cancelFlush(); + PowerMock.expectLastCall(); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + assertTrue(awaitLatch(produceLatch)); + + // the commitOffsets will return false since outStandingMessages are pending and timed out + assertFalse(workerTask.commitOffsets()); + workerTask.stop(); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); + + taskFuture.get(); + assertPollMetrics(1, false, false); PowerMock.verifyAll(); } @@ -845,7 +901,7 @@ public void testSlowTaskStart() throws Exception { assertTrue(awaitLatch(startupLatch)); workerTask.stop(); finishStartupLatch.countDown(); - assertTrue(workerTask.awaitStop(1000)); + assertTrue(workerTask.awaitStop(STOP_TIME_OUT)); workerTaskFuture.get(); @@ -930,7 +986,7 @@ public void testHeaders() throws Exception { expectTopicCreation(TOPIC); - Capture> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers); + Capture> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers, null); PowerMock.replayAll(); @@ -968,8 +1024,8 @@ public void testHeadersWithCustomConverter() throws Exception { expectTopicCreation(TOPIC); - Capture> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null); - Capture> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null); + Capture> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null, null); + Capture> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null, null); PowerMock.replayAll(); @@ -1114,7 +1170,7 @@ public void testSendRecordsTopicDescribeRetriesMidway() throws Exception { // Second round expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders(), null); PowerMock.replayAll(); @@ -1159,7 +1215,7 @@ public void testSendRecordsTopicCreateRetriesMidway() throws Exception { // Second round expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders(), null); PowerMock.replayAll(); @@ -1338,6 +1394,10 @@ private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) } private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException { + return expectPolls(minimum, count, null); + } + + private CountDownLatch expectPolls(int minimum, final AtomicInteger count, final CountDownLatch produceLatch) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(minimum); // Note that we stub these to allow any number of calls because the thread will continue to // run. The count passed in + latch returned just makes sure we get *at least* that number of @@ -1350,7 +1410,7 @@ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throw return RECORDS; }); // Fallout of the poll() call - expectSendRecordAnyTimes(); + expectSendRecordAnyTimes(produceLatch); return latch; } @@ -1376,24 +1436,28 @@ private Capture> expectSendRecordAnyTimes() throw return expectSendRecordTaskCommitRecordSucceed(true, false); } + private Capture> expectSendRecordAnyTimes(CountDownLatch producerLatch) throws InterruptedException { + return expectSendRecord(TOPIC, true, false, true, true, true, emptyHeaders(), producerLatch); + } + private Capture> expectSendRecordOnce(boolean isRetry) throws InterruptedException { return expectSendRecordTaskCommitRecordSucceed(false, isRetry); } private Capture> expectSendRecordProducerCallbackFail() throws InterruptedException { - return expectSendRecord(TOPIC, false, false, false, false, true, emptyHeaders()); + return expectSendRecord(TOPIC, false, false, false, false, true, emptyHeaders(), null); } private Capture> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, true, emptyHeaders()); + return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, true, emptyHeaders(), null); } private Capture> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, true, false, true, emptyHeaders()); + return expectSendRecord(TOPIC, anyTimes, isRetry, true, false, true, emptyHeaders(), null); } private Capture> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, succeed, true, true, emptyHeaders()); + return expectSendRecord(TOPIC, anyTimes, isRetry, succeed, true, true, emptyHeaders(), null); } private Capture> expectSendRecord( @@ -1403,7 +1467,8 @@ private Capture> expectSendRecord( boolean sendSuccess, boolean commitSuccess, boolean isMockedConverters, - Headers headers + Headers headers, + CountDownLatch producerLatch ) throws InterruptedException { if (isMockedConverters) { expectConvertHeadersAndKeyValue(topic, anyTimes, headers); @@ -1428,12 +1493,17 @@ private Capture> expectSendRecord( EasyMock.capture(producerCallbacks))); IAnswer> expectResponse = () -> { synchronized (producerCallbacks) { - for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - if (sendSuccess) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, - 0L, 0, 0), null); - } else { - cb.onCompletion(null, new TopicAuthorizationException("foo")); + // when producerLatch is set, we won't invoke callback since we want to make outstandingMessages queued + if (producerLatch != null) { + producerLatch.countDown(); + } else { + for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { + if (sendSuccess) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, + 0L, 0, 0), null); + } else { + cb.onCompletion(null, new TopicAuthorizationException("foo")); + } } } producerCallbacks.reset(); @@ -1549,7 +1619,7 @@ private void expectOffsetFlush(boolean succeed) throws Exception { } } - private void assertPollMetrics(int minimumPollCountExpected) { + private void assertPollMetrics(int minimumPollCountExpected, boolean isCommitSucceeded, boolean isWriteCompleted) { MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup(); MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup(); double pollRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-poll-rate"); @@ -1565,12 +1635,14 @@ private void assertPollMetrics(int minimumPollCountExpected) { double writeRate = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-rate"); double writeTotal = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-write-total"); - if (minimumPollCountExpected > 0) { - assertTrue(writeRate > 0.0d); - } else { - assertTrue(writeRate == 0.0d); + if (isWriteCompleted) { + if (minimumPollCountExpected > 0) { + assertTrue(writeRate > 0.0d); + } else { + assertTrue(writeRate == 0.0d); + } + assertTrue(writeTotal >= minimumPollCountExpected); } - assertTrue(writeTotal >= minimumPollCountExpected); double pollBatchTimeMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-max-time-ms"); double pollBatchTimeAvg = metrics.currentMetricValueAsDouble(sourceTaskGroup, "poll-batch-avg-time-ms"); @@ -1580,9 +1652,23 @@ private void assertPollMetrics(int minimumPollCountExpected) { assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d); double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count"); double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max"); - assertEquals(0, activeCount, 0.000001d); - if (minimumPollCountExpected > 0) { - assertEquals(RECORDS.size(), activeCountMax, 0.000001d); + + if (isWriteCompleted) { + assertEquals(0, activeCount, 0.000001d); + if (minimumPollCountExpected > 0) { + assertEquals(RECORDS.size(), activeCountMax, 0.000001d); + } + } + + double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage"); + double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage"); + + if (!isCommitSucceeded) { + assertTrue(failurePercentage > 0); + assertTrue(successPercentage == 0); + } else { + assertTrue(failurePercentage == 0); + assertTrue(successPercentage > 0); } }