diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 608a3c21591a8..28971ac282af1 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -109,6 +109,12 @@
test
+
+ org.awaitility
+ awaitility
+ test
+
+
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 8d2cbd8e74e14..012e4143744e8 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -190,12 +190,14 @@ public void start() {
});
runnerThread.setUncaughtExceptionHandler(
(t, e) -> {
- LOG.error("[{}] Error while consuming records", t.getName(), e);
- try {
- this.close();
- } catch (InterruptedException ex) {
- // The interrupted exception is thrown by the runnerThread itself. Ignore it.
- }
+ new Thread(() -> {
+ LOG.error("[{}] Error while consuming records", t.getName(), e);
+ try {
+ this.close();
+ } catch (Exception ex) {
+ LOG.error("[{}] Close kafka source error", t.getName(), e);
+ }
+ }, "Kafka Source Close Task Thread").start();
});
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index bc06c3e1935b4..402727f4ec015 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -31,6 +31,7 @@
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -173,7 +174,10 @@ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Excep
Field runningField = KafkaAbstractSource.class.getDeclaredField("running");
runningField.setAccessible(true);
- Assert.assertFalse((boolean) runningField.get(source));
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse((boolean) runningField.get(source));
+ Assert.assertNull(consumerField.get(source));
+ });
}
private File getFile(String name) {