diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index a4d409303e236..643b10e25acfb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -191,7 +191,7 @@ public void commitOffsets(boolean sync, final int seqno) { try { task.flush(offsets); } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); + log.error("Commit of {} offsets failed due to exception while flushing:", this, t); log.error("Rewinding offsets to last committed offsets"); for (Map.Entry entry : lastCommittedOffsets.entrySet()) { log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); @@ -288,7 +288,7 @@ private void deliverMessages() { pausedForRedelivery = false; } } catch (RetriableException e) { - log.error("RetriableException from SinkTask {}: {}", id, e); + log.error("RetriableException from SinkTask {}:", id, e); // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data, // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc. pausedForRedelivery = true; @@ -361,8 +361,10 @@ public void onPartitionsAssigned(Collection partitions) { @Override public void onPartitionsRevoked(Collection partitions) { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); + if (started) { + task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); + } // Make sure we don't have any leftover data since offsets will be reset to committed positions messageBatch.clear(); }