From 5c44efa57215153d8d8b3ca6ad50a5f878602f79 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 3 Nov 2015 21:42:23 -0800 Subject: [PATCH] KAFKA-2744: Commit source task offsets after task is completely stopped to ensure no additional messages are processed during the offset commit when stopping tasks for rebalancing. --- .../apache/kafka/copycat/runtime/WorkerSourceTask.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index ea9e6b5ec0250..78b588c68825e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -96,24 +96,24 @@ public void start(Properties props) { @Override public void stop() { task.stop(); - commitOffsets(); if (workThread != null) workThread.startGracefulShutdown(); } @Override public boolean awaitStop(long timeoutMs) { + boolean success = true; if (workThread != null) { try { - boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); if (!success) workThread.forceShutdown(); - return success; } catch (InterruptedException e) { - return false; + success = false; } } - return true; + commitOffsets(); + return success; } @Override