From 4874275bf2fcecd0476bf7c6b18073d34ee27b9d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 9 Nov 2015 15:40:50 -0800 Subject: [PATCH] KAFKA-2786: Only respond to SinkTask onPartitionsRevoked after the WorkerSinkTask has finished starting up. --- .../apache/kafka/connect/runtime/WorkerSinkTask.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index a4d409303e236..643b10e25acfb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -191,7 +191,7 @@ public void commitOffsets(boolean sync, final int seqno) { try { task.flush(offsets); } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); + log.error("Commit of {} offsets failed due to exception while flushing:", this, t); log.error("Rewinding offsets to last committed offsets"); for (Map.Entry entry : lastCommittedOffsets.entrySet()) { log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); @@ -288,7 +288,7 @@ private void deliverMessages() { pausedForRedelivery = false; } } catch (RetriableException e) { - log.error("RetriableException from SinkTask {}: {}", id, e); + log.error("RetriableException from SinkTask {}:", id, e); // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data, // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc. pausedForRedelivery = true; @@ -361,8 +361,10 @@ public void onPartitionsAssigned(Collection partitions) { @Override public void onPartitionsRevoked(Collection partitions) { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); + if (started) { + task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); + } // Make sure we don't have any leftover data since offsets will be reset to committed positions messageBatch.clear(); }