diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 11318fd3d9a79..976ecfdb21ff4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -94,6 +94,7 @@ class WorkerSinkTask extends WorkerTask { private int commitFailures; private boolean pausedForRedelivery; private boolean committing; + private boolean taskStopped; private final WorkerErrantRecordReporter workerErrantRecordReporter; public WorkerSinkTask(ConnectorTaskId id, @@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); this.consumer = consumer; this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.taskStopped = false; this.workerErrantRecordReporter = workerErrantRecordReporter; } @@ -168,6 +170,7 @@ protected void close() { } catch (Throwable t) { log.warn("Could not stop task", t); } + taskStopped = true; Utils.closeQuietly(consumer, "consumer"); Utils.closeQuietly(transformationChain, "transformation chain"); Utils.closeQuietly(retryWithToleranceOperator, "retry operator"); @@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty()) @Override public void onPartitionsRevoked(Collection partitions) { + if (taskStopped) { + log.trace("Skipping partition revocation callback as task has already been stopped"); + return; + } log.debug("{} Partitions revoked", WorkerSinkTask.this); try { closePartitions(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 12a553fc95e90..3ae9b56d87cb2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -315,6 +315,56 @@ public void testPause() throws Exception { PowerMock.verifyAll(); } + @Test + public void testShutdown() throws Exception { + createTask(initialState); + + expectInitializeTask(); + expectTaskGetTopic(true); + + // first iteration + expectPollInitialAssignment(); + + // second iteration + EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap()); + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + + // WorkerSinkTask::stop + consumer.wakeup(); + PowerMock.expectLastCall(); + sinkTask.stop(); + PowerMock.expectLastCall(); + + // WorkerSinkTask::close + consumer.close(); + PowerMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + rebalanceListener.getValue().onPartitionsRevoked( + asList(TOPIC_PARTITION, TOPIC_PARTITION2) + ); + return null; + } + }); + transformationChain.close(); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + workerTask.iteration(); + sinkTaskContext.getValue().requestCommit(); // Force an offset commit + workerTask.iteration(); + workerTask.stop(); + workerTask.close(); + + PowerMock.verifyAll(); + } + @Test public void testPollRedelivery() throws Exception { createTask(initialState);