diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index 669e5f5a5f695..e3193939c8e79 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.storage; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; import org.slf4j.Logger; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this @@ -40,7 +42,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore { private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); protected Map data = new HashMap<>(); - protected ExecutorService executor = Executors.newSingleThreadExecutor(); + protected ExecutorService executor; public MemoryOffsetBackingStore() { @@ -51,12 +53,26 @@ public void configure(WorkerConfig config) { } @Override - public synchronized void start() { + public void start() { + executor = Executors.newSingleThreadExecutor(); } @Override - public synchronized void stop() { - // Nothing to do since this doesn't maintain any outstanding connections/data + public void stop() { + if (executor != null) { + executor.shutdown(); + // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete. + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!executor.shutdownNow().isEmpty()) { + throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting without cleanly " + + "shutting down pending tasks and/or callbacks."); + } + executor = null; + } } @Override