From 294551dd09826a6048d3b37d03341795588c907c Mon Sep 17 00:00:00 2001 From: Maksym Rymar Date: Wed, 2 Feb 2022 16:03:47 +0200 Subject: [PATCH] DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697 --- .../exec/store/kafka/KafkaGroupScan.java | 36 +++++++++++++++++-- .../exec/store/kafka/MessageIterator.java | 3 +- .../exec/store/kafka/KafkaQueriesTest.java | 28 ++++++++++++--- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java index 58c2f9f5484..e025e65836e 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.kafka; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -26,6 +27,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -44,6 +46,7 @@ import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -176,14 +179,13 @@ private void init() { .message("Table '%s' does not exist", topicName) .build(logger); } - kafkaConsumer.subscribe(Collections.singletonList(topicName)); // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions // evaluates lazily, seeking to the first/last offset in all partitions only // when poll(long) or // position(TopicPartition) are called - kafkaConsumer.poll(0); - Set assignments = kafkaConsumer.assignment(); + kafkaConsumer.poll(Duration.ofSeconds(5)); + Set assignments = waitForConsumerAssignment(kafkaConsumer); topicPartitions = kafkaConsumer.partitionsFor(topicName); // fetch start offsets for each topicPartition @@ -227,6 +229,34 @@ private void init() { } } + + /** Workaround for Kafka > 2.0 version due to KIP-505. + * It can be replaced with Kafka implementation once it will be introduced. + * @param consumer Kafka consumer whom need to get assignments + * @return + * @throws InterruptedException + */ + private Set waitForConsumerAssignment(Consumer consumer) throws InterruptedException { + Set assignments = consumer.assignment(); + + long waitingForAssigmentTimeout = kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT); + long timeout = 0; + + while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) { + Thread.sleep(500); + timeout += 500; + assignments = consumer.assignment(); + } + + if (timeout >= waitingForAssigmentTimeout) { + throw UserException.dataReadError() + .message("Consumer assignment wasn't completed within the timeout %s", waitingForAssigmentTimeout) + .build(logger); + } + + return assignments; + } + @Override public void applyAssignments(List incomingEndpoints) { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java index 68855ce8056..98272982577 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.kafka; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -79,7 +80,7 @@ public boolean hasNext() { ConsumerRecords consumerRecords; Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; try { - consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut); + consumerRecords = kafkaConsumer.poll(Duration.ofMillis(kafkaPollTimeOut)); } catch (KafkaException ke) { throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger); } finally { diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index e04012c5e26..e86423e4f02 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -35,6 +35,7 @@ import org.junit.experimental.categories.Category; import org.junit.runners.MethodSorters; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -120,7 +121,7 @@ public void testInformationSchema() throws Exception { queryBuilder().sql(query).run(); } - private Map fetchOffsets(int flag) { + private Map fetchOffsets(int flag) throws InterruptedException { Consumer kafkaConsumer = null; try { kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), @@ -132,8 +133,8 @@ private Map fetchOffsets(int flag) { // evaluates lazily, seeking to the // first/last offset in all partitions only when poll(long) or // position(TopicPartition) are called - kafkaConsumer.poll(0); - Set assignments = kafkaConsumer.assignment(); + kafkaConsumer.poll(Duration.ofSeconds(5)); + Set assignments = waitForConsumerAssignment(kafkaConsumer); if (flag == -2) { // fetch start offsets for each topicPartition @@ -156,6 +157,25 @@ private Map fetchOffsets(int flag) { } } + private Set waitForConsumerAssignment(Consumer consumer) throws InterruptedException { + Set assignments = consumer.assignment(); + + long waitingForAssigmentTimeout = 5000; + long timeout = 0; + + while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) { + Thread.sleep(500); + timeout += 500; + assignments = consumer.assignment(); + } + + if (timeout >= waitingForAssigmentTimeout) { + fail("Consumer assignment wasn't completed within the timeout " + waitingForAssigmentTimeout); + } + + return assignments; + } + @Test public void testPhysicalPlanSubmission() throws Exception { String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC); @@ -281,4 +301,4 @@ public void testEscapeAnyChar() throws Exception { client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR); } } -} \ No newline at end of file +}