Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
Expand Down Expand Up @@ -111,7 +112,7 @@ protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
}

@Override
protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata(
protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
Expand Down Expand Up @@ -225,6 +226,14 @@ protected boolean isEndOfShard(Long seqNum)
return false;
}

@Override
public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
{
};
}

@Nullable
@Override
protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
Expand Down Expand Up @@ -627,6 +629,14 @@ protected boolean isEndOfShard(Long seqNum)
return false;
}

@Override
public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
{
};
}

@Nonnull
@Override
protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
Expand Down Expand Up @@ -726,7 +736,7 @@ protected boolean isStartingSequenceOffsetsExclusive()


@Override
protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata(
protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic
);
}

private static List<ProducerRecord<byte[], byte[]>> generateSinglePartitionRecords(String topic)
{
return ImmutableList.of(
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10", "20.0", "1.0"))
);
}

private static String getTopicName()
{
return "topic" + topicPostfix++;
Expand Down Expand Up @@ -863,23 +883,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
if (!isIncrementalHandoffSupported) {
return;
}

List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10", "20.0", "1.0"))
);
records = generateSinglePartitionRecords(topic);

final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
Expand All @@ -894,22 +898,14 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");

final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 0L)
);
final SeekableStreamPartitions<Integer, Long> checkpoint1 = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 5L)
);
final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 12L)
);
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, Long.MAX_VALUE)
);
final SeekableStreamPartitions<Integer, Long> startPartitions =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L));
final SeekableStreamPartitions<Integer, Long> checkpoint1 =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
final SeekableStreamPartitions<Integer, Long> checkpoint2 =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
final SeekableStreamPartitions<Integer, Long> endPartitions =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE));

final KafkaIndexTask task = createTask(
null,
Expand Down Expand Up @@ -1923,6 +1919,115 @@ public void testRestore() throws Exception
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}

@Test(timeout = 60_000L)
public void testRestoreAfterPersistingSequences() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}

records = generateSinglePartitionRecords(topic);
maxRowsPerSegment = 2;
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");

final KafkaIndexTask task1 = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
false
)
);

final SeekableStreamPartitions<Integer, Long> checkpoint = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 5L)
);

final ListenableFuture<TaskStatus> future1 = runTask(task1);

// Insert some data, but not enough for the task to finish
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 5)) {
kafkaProducer.send(record).get();
}
}

while (task1.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), currentOffsets);
// Set endOffsets to persist sequences
task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);

// Stop without publishing segment
task1.stopGracefully(toolboxFactory.build(task1).getConfig());
unlockAppenderatorBasePersistDirForTask(task1);

Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());

// Start a new task
final KafkaIndexTask task2 = createTask(
task1.getId(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)),
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
false
)
);

final ListenableFuture<TaskStatus> future2 = runTask(task2);
// Wait for the task to start reading

// Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) {
kafkaProducer.send(record).get();
}
}

// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());

// Check metrics
Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(4, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());

// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2008/P1D", 0);
SegmentDescriptor desc2 = SD(task1, "2008/P1D", 1);
SegmentDescriptor desc3 = SD(task1, "2009/P1D", 0);
SegmentDescriptor desc4 = SD(task1, "2009/P1D", 1);
SegmentDescriptor desc5 = SD(task1, "2010/P1D", 0);
SegmentDescriptor desc6 = SD(task1, "2011/P1D", 0);
SegmentDescriptor desc7 = SD(task1, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
}

@Test(timeout = 60_000L)
public void testRunWithPauseAndResume() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
Expand Down Expand Up @@ -92,7 +93,7 @@ protected List<OrderedPartitionableRecord<String, String>> getRecords(
}

@Override
protected SeekableStreamPartitions<String, String> deserializeSeekableStreamPartitionsFromMetadata(
protected SeekableStreamPartitions<String, String> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
Expand Down Expand Up @@ -176,6 +177,14 @@ protected boolean isEndOfShard(String seqNum)
return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
}

@Override
public TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<String, String>>>()
{
};
}

@Nullable
@Override
protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
Expand Down
Loading