From 1c0638bae8435a98586e8a66f48a26440abb926c Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 13 Jul 2023 11:56:30 +0800 Subject: [PATCH 1/2] [fix][io] Not restart instance when kafka source poll exception. --- .../pulsar/io/kafka/KafkaAbstractSource.java | 14 +++++------ .../kafka/source/KafkaAbstractSourceTest.java | 23 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 012e4143744e8..77bc8f83e7797 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -190,14 +190,12 @@ public void start() { }); runnerThread.setUncaughtExceptionHandler( (t, e) -> { - new Thread(() -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - this.close(); - } catch (Exception ex) { - LOG.error("[{}] Close kafka source error", t.getName(), e); - } - }, "Kafka Source Close Task Thread").start(); + LOG.error("[{}] Error while consuming records", t.getName(), e); + try { + notifyError((Exception) e); + } catch (Exception ex) { + LOG.error("[{}] Close kafka source error", t.getName(), e); + } }); runnerThread.setName("Kafka Source Thread"); runnerThread.start(); diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 402727f4ec015..6c67f3c57bc87 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -31,7 +31,6 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; -import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -158,9 +157,16 @@ public final void loadFromSaslYamlFileTest() throws IOException { assertEquals(config.getSslTruststorePassword(), "cert_pwd"); } - @Test - public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception { + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Uncaught exception") + public final void whenUnexpectedExceptionThrownOnReadTest() throws Exception { KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + Consumer consumer = mock(Consumer.class); Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer) .subscribe(Mockito.any(Collection.class)); @@ -168,16 +174,9 @@ public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Excep Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); consumerField.setAccessible(true); consumerField.set(source, consumer); - source.start(); - - Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); - runningField.setAccessible(true); - - Awaitility.await().untilAsserted(() -> { - Assert.assertFalse((boolean) runningField.get(source)); - Assert.assertNull(consumerField.get(source)); - }); + // will throw RuntimeException. + source.read(); } private File getFile(String name) { From 7d8f05caa21fcadd0b3d74f786fb510548bfcbf8 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 13 Jul 2023 16:15:56 +0800 Subject: [PATCH 2/2] Refactor kafka source connector start logic. --- .../pulsar/io/kafka/KafkaAbstractSource.java | 51 ++++++++----------- .../kafka/source/KafkaAbstractSourceTest.java | 28 ++++++++-- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 77bc8f83e7797..3d4612c039f36 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -27,7 +27,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -133,7 +132,6 @@ public void open(Map config, SourceContext sourceContext) throws throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex); } this.start(); - running = true; } protected Properties beforeCreateConsumer(Properties props) { @@ -158,45 +156,36 @@ public void close() throws InterruptedException { @SuppressWarnings("unchecked") public void start() { + LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); runnerThread = new Thread(() -> { - LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic()); - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); while (running) { - ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); - CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; - int index = 0; - for (ConsumerRecord consumerRecord : consumerRecords) { - KafkaRecord record = buildRecord(consumerRecord); - if (LOG.isDebugEnabled()) { - LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + try { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); + CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; + int index = 0; + for (ConsumerRecord consumerRecord : consumerRecords) { + KafkaRecord record = buildRecord(consumerRecord); + if (LOG.isDebugEnabled()) { + LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + } + consume(record); + futures[index] = record.getCompletableFuture(); + index++; } - consume(record); - futures[index] = record.getCompletableFuture(); - index++; - } - if (!kafkaSourceConfig.isAutoCommitEnabled()) { - try { + if (!kafkaSourceConfig.isAutoCommitEnabled()) { CompletableFuture.allOf(futures).get(); consumer.commitSync(); - } catch (InterruptedException ex) { - break; - } catch (ExecutionException ex) { - LOG.error("Error while processing records", ex); - break; } + } catch (Exception e) { + LOG.error("Error while processing records", e); + notifyError(e); + break; } } }); - runnerThread.setUncaughtExceptionHandler( - (t, e) -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - notifyError((Exception) e); - } catch (Exception ex) { - LOG.error("[{}] Close kafka source error", t.getName(), e); - } - }); + running = true; runnerThread.setName("Kafka Source Thread"); runnerThread.start(); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 6c67f3c57bc87..6911ec2a6bfa3 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.lang.reflect.Field; @@ -157,8 +158,8 @@ public final void loadFromSaslYamlFileTest() throws IOException { assertEquals(config.getSslTruststorePassword(), "cert_pwd"); } - @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Uncaught exception") - public final void whenUnexpectedExceptionThrownOnReadTest() throws Exception { + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Subscribe exception") + public final void throwExceptionBySubscribe() throws Exception { KafkaAbstractSource source = new DummySource(); KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); @@ -168,9 +169,30 @@ public final void whenUnexpectedExceptionThrownOnReadTest() throws Exception { kafkaSourceConfigField.set(source, kafkaSourceConfig); Consumer consumer = mock(Consumer.class); - Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer) + Mockito.doThrow(new RuntimeException("Subscribe exception")).when(consumer) .subscribe(Mockito.any(Collection.class)); + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + // will throw RuntimeException. + source.start(); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Pool exception") + public final void throwExceptionByPoll() throws Exception { + KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + + Consumer consumer = mock(Consumer.class); + Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer) + .poll(Mockito.any(Duration.class)); + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); consumerField.setAccessible(true); consumerField.set(source, consumer);