diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 012e4143744e8..3d4612c039f36 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -27,7 +27,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -133,7 +132,6 @@ public void open(Map config, SourceContext sourceContext) throws throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex); } this.start(); - running = true; } protected Properties beforeCreateConsumer(Properties props) { @@ -158,47 +156,36 @@ public void close() throws InterruptedException { @SuppressWarnings("unchecked") public void start() { + LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); runnerThread = new Thread(() -> { - LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic()); - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); while (running) { - ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); - CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; - int index = 0; - for (ConsumerRecord consumerRecord : consumerRecords) { - KafkaRecord record = buildRecord(consumerRecord); - if (LOG.isDebugEnabled()) { - LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + try { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); + CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; + int index = 0; + for (ConsumerRecord consumerRecord : consumerRecords) { + KafkaRecord record = buildRecord(consumerRecord); + if (LOG.isDebugEnabled()) { + LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + } + consume(record); + futures[index] = record.getCompletableFuture(); + index++; } - consume(record); - futures[index] = record.getCompletableFuture(); - index++; - } - if (!kafkaSourceConfig.isAutoCommitEnabled()) { - try { + if (!kafkaSourceConfig.isAutoCommitEnabled()) { CompletableFuture.allOf(futures).get(); consumer.commitSync(); - } catch (InterruptedException ex) { - break; - } catch (ExecutionException ex) { - LOG.error("Error while processing records", ex); - break; } + } catch (Exception e) { + LOG.error("Error while processing records", e); + notifyError(e); + break; } } }); - runnerThread.setUncaughtExceptionHandler( - (t, e) -> { - new Thread(() -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - this.close(); - } catch (Exception ex) { - LOG.error("[{}] Close kafka source error", t.getName(), e); - } - }, "Kafka Source Close Task Thread").start(); - }); + running = true; runnerThread.setName("Kafka Source Thread"); runnerThread.start(); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 402727f4ec015..6911ec2a6bfa3 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.lang.reflect.Field; @@ -31,7 +32,6 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; -import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -158,26 +158,47 @@ public final void loadFromSaslYamlFileTest() throws IOException { assertEquals(config.getSslTruststorePassword(), "cert_pwd"); } - @Test - public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception { + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Subscribe exception") + public final void throwExceptionBySubscribe() throws Exception { KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + Consumer consumer = mock(Consumer.class); - Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer) + Mockito.doThrow(new RuntimeException("Subscribe exception")).when(consumer) .subscribe(Mockito.any(Collection.class)); Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); consumerField.setAccessible(true); consumerField.set(source, consumer); - + // will throw RuntimeException. source.start(); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Pool exception") + public final void throwExceptionByPoll() throws Exception { + KafkaAbstractSource source = new DummySource(); - Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); - runningField.setAccessible(true); + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); - Awaitility.await().untilAsserted(() -> { - Assert.assertFalse((boolean) runningField.get(source)); - Assert.assertNull(consumerField.get(source)); - }); + Consumer consumer = mock(Consumer.class); + Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer) + .poll(Mockito.any(Duration.class)); + + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + source.start(); + // will throw RuntimeException. + source.read(); } private File getFile(String name) {