From 22dc741cc2a9d07627b618e30f0d9268d9247be7 Mon Sep 17 00:00:00 2001 From: Peter Davis Date: Fri, 13 May 2016 12:58:32 -0700 Subject: [PATCH 1/3] KAFKA-3710: MemoryOffsetBackingStore shutdown ExecutorService needs to be shutdown on close, lest a zombie thread prevent clean shutdown. --- .../kafka/connect/storage/MemoryOffsetBackingStore.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 669e5f5a5f695..892130fadc8f6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -40,7 +40,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); protected Map data = new HashMap<>(); - protected ExecutorService executor = Executors.newSingleThreadExecutor(); + protected ExecutorService executor; public MemoryOffsetBackingStore() { @@ -52,11 +52,15 @@ public void configure(WorkerConfig config) { @Override public synchronized void start() { + executor = Executors.newSingleThreadExecutor(); } @Override public synchronized void stop() { - // Nothing to do since this doesn't maintain any outstanding connections/data + if (executor != null) { + executor.shutdown(); + executor = null; + } } @Override From 0bd40293ceb970525aee288d07d2ebb8d451bb79 Mon Sep 17 00:00:00 2001 From: Peter Davis Date: Tue, 17 May 2016 12:55:58 -0700 Subject: [PATCH 2/3] KAFKA-3710: MemoryOffsetBackingStore shutdown. Attempt to wait for tasks to fully shutdown cleanly. --- .../connect/storage/MemoryOffsetBackingStore.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 892130fadc8f6..527c2bacd82dd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -59,6 +59,16 @@ public synchronized void start() { public synchronized void stop() { if (executor != null) { executor.shutdown(); + // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete. + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!executor.shutdownNow().isEmpty()) { + throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting without cleanly " + + "shutting down pending tasks and/or callbacks."); + } executor = null; } } From 3c3d010d2a22d1195739c9ec2ae8919af83be2ba Mon Sep 17 00:00:00 2001 From: Peter Davis Date: Tue, 17 May 2016 17:33:08 -0700 Subject: [PATCH 3/3] KAFKA-3710: MemoryOffsetBackingStore shutdown Remove `synchronized` and fix imports. --- .../kafka/connect/storage/MemoryOffsetBackingStore.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 527c2bacd82dd..e3193939c8e79 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.storage; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; import org.slf4j.Logger; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this @@ -51,12 +53,12 @@ public void configure(WorkerConfig config) { } @Override - public synchronized void start() { + public void start() { executor = Executors.newSingleThreadExecutor(); } @Override - public synchronized void stop() { + public void stop() { if (executor != null) { executor.shutdown(); // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.