From b544bbb69467289e8351cb376ab3957f5ac631c5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 21:36:12 +0800 Subject: [PATCH 1/7] [fix][io] Close the kafka source connector close stuck --- .../pulsar/io/kafka/KafkaAbstractSource.java | 17 +++++++++++------ .../kafka/source/KafkaAbstractSourceTest.java | 7 ++++++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 8d2cbd8e74e14..e18fca7c6cde3 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -28,6 +28,8 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -64,6 +66,7 @@ public abstract class KafkaAbstractSource extends PushSource { private volatile boolean running = false; private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; + private final Executor executor = Executors.newSingleThreadExecutor(); @Override public void open(Map config, SourceContext sourceContext) throws Exception { @@ -190,12 +193,14 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - this.close(); - } catch (InterruptedException ex) { - // The interrupted exception is thrown by the runnerThread itself. Ignore it. - } + executor.execute(() -> { + LOG.error("[{}] Error while consuming records", t.getName(), e); + try { + this.close(); + } catch (Exception ex) { + LOG.error("[{}] Close kafka source error", t.getName(), e); + } + }); }); runnerThread.setName("Kafka Source Thread"); runnerThread.start(); diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index bc06c3e1935b4..770955bbcc6e8 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; +import io.netty.handler.codec.spdy.DefaultSpdyGoAwayFrame; import java.util.Collection; import java.util.Collections; import java.lang.reflect.Field; @@ -31,6 +32,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -173,7 +175,10 @@ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Excep Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); runningField.setAccessible(true); - Assert.assertFalse((boolean) runningField.get(source)); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse((boolean) runningField.get(source)); + Assert.assertNull(consumerField.get(source)); + }); } private File getFile(String name) { From f16ac9529e1935d01b9d129f17c95773441af6d7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 21:55:31 +0800 Subject: [PATCH 2/7] make the executor to static --- pulsar-io/kafka/pom.xml | 6 ++++++ .../org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml index 608a3c21591a8..28971ac282af1 100644 --- a/pulsar-io/kafka/pom.xml +++ b/pulsar-io/kafka/pom.xml @@ -109,6 +109,12 @@ test + + org.awaitility + awaitility + test + + diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index e18fca7c6cde3..b0134dfbe8ae8 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.kafka; import io.jsonwebtoken.io.Encoders; +import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -66,7 +67,8 @@ public abstract class KafkaAbstractSource extends PushSource { private volatile boolean running = false; private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; - private final Executor executor = Executors.newSingleThreadExecutor(); + private final static Executor executor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("Kafka_source_close_task_thread")); @Override public void open(Map config, SourceContext sourceContext) throws Exception { From 2556d66768d710b26cee6e7faa0e72368ebf9df4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 21:56:14 +0800 Subject: [PATCH 3/7] rename thread --- .../java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index b0134dfbe8ae8..6e0474cd839ce 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -68,7 +68,7 @@ public abstract class KafkaAbstractSource extends PushSource { private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; private final static Executor executor = Executors.newSingleThreadExecutor( - new DefaultThreadFactory("Kafka_source_close_task_thread")); + new DefaultThreadFactory("Kafka Source Close Task Thread")); @Override public void open(Map config, SourceContext sourceContext) throws Exception { From ab4eb210e94a1056108c5d86b5ab528eedc0c1b2 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 21:59:02 +0800 Subject: [PATCH 4/7] rename executor --- .../java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 6e0474cd839ce..15a9530e00fc3 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -67,7 +67,7 @@ public abstract class KafkaAbstractSource extends PushSource { private volatile boolean running = false; private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; - private final static Executor executor = Executors.newSingleThreadExecutor( + private final static Executor EXECUTOR = Executors.newSingleThreadExecutor( new DefaultThreadFactory("Kafka Source Close Task Thread")); @Override @@ -195,7 +195,7 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { - executor.execute(() -> { + EXECUTOR.execute(() -> { LOG.error("[{}] Error while consuming records", t.getName(), e); try { this.close(); From 04b95c64911c6732b132154a1961c58031b6b546 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 22:01:39 +0800 Subject: [PATCH 5/7] remove executor --- .../org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 15a9530e00fc3..012e4143744e8 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -19,7 +19,6 @@ package org.apache.pulsar.io.kafka; import io.jsonwebtoken.io.Encoders; -import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -29,8 +28,6 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -67,8 +64,6 @@ public abstract class KafkaAbstractSource extends PushSource { private volatile boolean running = false; private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; - private final static Executor EXECUTOR = Executors.newSingleThreadExecutor( - new DefaultThreadFactory("Kafka Source Close Task Thread")); @Override public void open(Map config, SourceContext sourceContext) throws Exception { @@ -195,14 +190,14 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { - EXECUTOR.execute(() -> { + new Thread(() -> { LOG.error("[{}] Error while consuming records", t.getName(), e); try { this.close(); } catch (Exception ex) { LOG.error("[{}] Close kafka source error", t.getName(), e); } - }); + }, "Kafka Source Close Task Thread").start(); }); runnerThread.setName("Kafka Source Thread"); runnerThread.start(); From 4d30ef4e79bd7ee1e5b2aa4eb2e375c730fccf30 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 22:20:08 +0800 Subject: [PATCH 6/7] fix checkstyle --- .../apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 770955bbcc6e8..4f2783d43fd0c 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableMap; -import io.netty.handler.codec.spdy.DefaultSpdyGoAwayFrame; import java.util.Collection; import java.util.Collections; import java.lang.reflect.Field; @@ -30,7 +29,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.io.core.SourceContext; -import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; import org.awaitility.Awaitility; import org.mockito.Mockito; From e64e835a8376a6ac882acf220df5602a8ccf03e6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 30 Jun 2023 23:14:12 +0800 Subject: [PATCH 7/7] fix improts --- .../apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 4f2783d43fd0c..402727f4ec015 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; import org.awaitility.Awaitility; import org.mockito.Mockito;