Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata.getKafkaPartitions()
Expand Down Expand Up @@ -1034,13 +1034,13 @@ private KafkaConsumer<byte[], byte[]> getKafkaConsumer()

private void updatePartitionDataFromKafka()
{
Map<String, List<PartitionInfo>> topics;
List<PartitionInfo> partitions;
try {
synchronized (consumerLock) {
topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers
partitions = consumer.partitionsFor(ioConfig.getTopic());
}
}
catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond
catch (Exception e) {
log.warn(
e,
"Unable to get partition data from Kafka for brokers [%s], are the brokers up?",
Expand All @@ -1049,10 +1049,6 @@ private void updatePartitionDataFromKafka()
return;
}

List<PartitionInfo> partitions = topics.get(ioConfig.getTopic());
if (partitions == null) {
log.warn("No such topic [%s] found, list of discovered topics [%s]", ioConfig.getTopic(), topics.keySet());
}
int numPartitions = (partitions != null ? partitions.size() : 0);

log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, ioConfig.getTopic());
Expand Down Expand Up @@ -1101,7 +1097,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti
taskCount++;
final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
final String taskId = task.getId();

// Determine which task group this task belongs to based on one of the partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
Expand Down Expand Up @@ -2263,16 +2259,17 @@ private Runnable buildRunTask()
private void updateLatestOffsetsFromKafka()
{
synchronized (consumerLock) {
final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
final List<PartitionInfo> partitionInfoList = consumer.partitionsFor(ioConfig.getTopic());

if (topics == null || !topics.containsKey(ioConfig.getTopic())) {
if (partitionInfoList == null || partitionInfoList.size() == 0) {
throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic());
}

final Set<TopicPartition> topicPartitions = topics.get(ioConfig.getTopic())
.stream()
.map(x -> new TopicPartition(x.topic(), x.partition()))
.collect(Collectors.toSet());
final Set<TopicPartition> topicPartitions = partitionInfoList
.stream()
.map(x -> new TopicPartition(x.topic(), x.partition()))
.collect(Collectors.toSet());

consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -75,6 +78,7 @@
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
Expand All @@ -101,6 +105,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -133,6 +138,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
private static ZkUtils zkUtils;

private final int numThreads;

Expand Down Expand Up @@ -174,12 +180,19 @@ public static void setupClass() throws Exception
zkServer.getConnectString(),
null,
1,
ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
ImmutableMap.of(
"num.partitions",
String.valueOf(NUM_PARTITIONS),
"auto.create.topics.enable",
String.valueOf(false)
)
);
kafkaServer.start();
kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());

dataSchema = getDataSchema(DATASOURCE);

zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, JaasUtils.isZkSecurityEnabled());
}

@Before
Expand Down Expand Up @@ -238,6 +251,9 @@ public static void tearDownClass() throws IOException

zkServer.stop();
zkServer = null;

zkUtils.close();
zkUtils = null;
}

@Test
Expand Down Expand Up @@ -2200,7 +2216,8 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
Thread.sleep(100);
}

Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
Assert.assertTrue(serviceEmitter.getStackTrace()
.startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
Assert.assertEquals(
"WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
serviceEmitter.getExceptionMessage()
Expand Down Expand Up @@ -2532,6 +2549,9 @@ public void testFailedInitializationAndRecovery() throws Exception

private void addSomeEvents(int numEventsPerPartition) throws Exception
{
//create topic manually
AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would you tell me why this is needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the value of setting auto.craete.topics.enable is ture(default), consumer.partitionsFor(SOME_TOPIC) will create the SOME_TOPIC if the topic doesn't exist. This leads some UT like testXXXNoTasks() failed. So I disable it but this lead some UTs which need sending some events to kafka fail. So I create the topic in kafka manually before sending data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, it looks like there was something wrong in Github. Got it. Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that ~ I will try to delete the duplicate comment~


try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
Expand Down