diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 6424c290fc9d..ae092d5d6b62 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; +import org.apache.druid.indexing.seekablestream.SequenceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -111,7 +112,7 @@ protected List> getRecords( } @Override - protected SeekableStreamPartitions deserializeSeekableStreamPartitionsFromMetadata( + protected SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, Object object ) @@ -225,6 +226,14 @@ protected boolean isEndOfShard(Long seqNum) return false; } + @Override + public TypeReference>> getSequenceMetadataTypeReference() + { + return new TypeReference>>() + { + }; + } + @Nullable @Override protected TreeMap> getCheckPointsFromContext( diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index c822088ebc88..1082473a37dd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -50,6 +51,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; +import org.apache.druid.indexing.seekablestream.SequenceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -610,6 +612,14 @@ protected boolean isEndOfShard(Long seqNum) return false; } + @Override + public TypeReference>> getSequenceMetadataTypeReference() + { + return new TypeReference>>() + { + }; + } + @Nonnull @Override protected List> getRecords( @@ -709,7 +719,7 @@ protected boolean isStartingSequenceOffsetsExclusive() @Override - protected SeekableStreamPartitions deserializeSeekableStreamPartitionsFromMetadata( + protected SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, Object object ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index ef771857d27d..67ab4550899f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -293,6 +293,26 @@ private static List> generateRecords(String topic ); } + private static List> generateSinglePartitionRecords(String topic) + { + return ImmutableList.of( + new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0")) + ); + } + private static String getTopicName() { return "topic" + topicPostfix++; @@ -694,7 +714,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception } final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - + Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets)); task.getRunner().setEndOffsets(nextOffsets, false); @@ -726,7 +746,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); - + // Check published metadata SegmentDescriptor desc1 = sd(task, "2008/P1D", 0); SegmentDescriptor desc2 = sd(task, "2009/P1D", 0); @@ -739,7 +759,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Assert.assertEquals( new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); - + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals( new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), @@ -867,23 +887,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception if (!isIncrementalHandoffSupported) { return; } - - List> records = ImmutableList.of( - new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0")) - ); + records = generateSinglePartitionRecords(topic); final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen @@ -901,22 +905,14 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); - final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of(0, 0L) - ); - final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of(0, 5L) - ); - final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of(0, 12L) - ); - final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of(0, Long.MAX_VALUE) - ); + final SeekableStreamPartitions startPartitions = + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)); + final SeekableStreamPartitions checkpoint1 = + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)); + final SeekableStreamPartitions checkpoint2 = + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L)); + final SeekableStreamPartitions endPartitions = + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)); final KafkaIndexTask task = createTask( null, @@ -1859,6 +1855,119 @@ public void testRestore() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRestoreAfterPersistingSequences() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + + records = generateSinglePartitionRecords(topic); + maxRowsPerSegment = 2; + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final KafkaIndexTask task1 = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)), + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final SeekableStreamPartitions checkpoint = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of(0, 5L) + ); + + final ListenableFuture future1 = runTask(task1); + + // Insert some data, but not enough for the task to finish + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + while (task1.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), currentOffsets); + // Set endOffsets to persist sequences + task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false); + + // Stop without publishing segment + task1.stopGracefully(toolboxFactory.build(task1).getConfig()); + unlockAppenderatorBasePersistDirForTask(task1); + + Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); + + // Start a new task + final KafkaIndexTask task2 = createTask( + task1.getId(), + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)), + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final ListenableFuture future2 = runTask(task2); + // Wait for the task to start reading + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.skip(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(4, task2.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0); + SegmentDescriptor desc2 = sd(task1, "2008/P1D", 1); + SegmentDescriptor desc3 = sd(task1, "2009/P1D", 0); + SegmentDescriptor desc4 = sd(task1, "2009/P1D", 1); + SegmentDescriptor desc5 = sd(task1, "2010/P1D", 0); + SegmentDescriptor desc6 = sd(task1, "2011/P1D", 0); + SegmentDescriptor desc7 = sd(task1, "2012/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + } + @Test(timeout = 60_000L) public void testRunWithPauseAndResume() throws Exception { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 3e7e5e7aeddd..50326f751ce1 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; +import org.apache.druid.indexing.seekablestream.SequenceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -92,7 +93,7 @@ protected List> getRecords( } @Override - protected SeekableStreamPartitions deserializeSeekableStreamPartitionsFromMetadata( + protected SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, Object object ) @@ -176,6 +177,14 @@ protected boolean isEndOfShard(String seqNum) return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum); } + @Override + public TypeReference>> getSequenceMetadataTypeReference() + { + return new TypeReference>>() + { + }; + } + @Nullable @Override protected TreeMap> getCheckPointsFromContext( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 98a3bee494e7..d5d04248fc01 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -198,28 +198,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport private static String shardId1 = "1"; private static String shardId0 = "0"; private static KinesisRecordSupplier recordSupplier; - private static List> records = ImmutableList.of( - new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>( - stream, - "1", - "5", - jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0") - ), - new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))), - new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))), - new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))), - new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")), - new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0")) - ); + private static List> records; private static ServiceEmitter emitter; private static ListeningExecutorService taskExec; @@ -315,6 +294,7 @@ public void setupTest() throws IOException, InterruptedException maxSavedParseExceptions = null; skipAvailabilityCheck = false; doHandoff = true; + records = generateRecords(stream); reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json"); maxRecordsPerPoll = 1; @@ -347,6 +327,52 @@ public static void tearDownClass() throws Exception emitter.close(); } + private static List> generateRecords(String stream) + { + return ImmutableList.of( + new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>( + stream, + "1", + "5", + jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0") + ), + new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))), + new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))), + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))), + new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")), + new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0")) + ); + } + + private static List> generateSinglePartitionRecords(String stream) + { + return ImmutableList.of( + new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) + ); + } @Test(timeout = 120_000L) public void testRunAfterDataInserted() throws Exception { @@ -2213,6 +2239,165 @@ public void testRestore() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 120_000L) + public void testRestoreAfterPersistingSequences() throws Exception + { + maxRowsPerSegment = 2; + maxRecordsPerPoll = 1; + records = generateSinglePartitionRecords(stream); + + recordSupplier.assign(anyObject()); + expectLastCall().anyTimes(); + + expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + + recordSupplier.seek(anyObject(), anyString()); + expectLastCall().anyTimes(); + + // simulate 1 record at a time + expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(0))) + .once() + .andReturn(Collections.singletonList(records.get(1))) + .once() + .andReturn(Collections.singletonList(records.get(2))) + .once() + .andReturn(Collections.singletonList(records.get(3))) + .once() + .andReturn(Collections.singletonList(records.get(4))) + .once() + .andReturn(Collections.emptyList()) + .anyTimes(); + + replayAll(); + + final KinesisIndexTask task1 = createTask( + "task1", + new KinesisIndexTaskIOConfig( + null, + "sequence0", + new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "0" + )), + new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "6" + )), + true, + null, + null, + "awsEndpoint", + null, + null, + null, + null, + null, + false + ) + ); + + final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( + stream, + ImmutableMap.of(shardId1, "4") + ); + + final ListenableFuture future1 = runTask(task1); + + while (task1.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); + task1.getRunner().setEndOffsets(currentOffsets, false); + + // Stop without publishing segment + task1.stopGracefully(toolboxFactory.build(task1).getConfig()); + unlockAppenderatorBasePersistDirForTask(task1); + + Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); + + verifyAll(); + reset(recordSupplier); + + recordSupplier.assign(anyObject()); + expectLastCall().anyTimes(); + + expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + + + recordSupplier.seek(anyObject(), anyString()); + expectLastCall().anyTimes(); + + expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(5))) + .once() + .andReturn(Collections.singletonList(records.get(6))) + .once() + .andReturn(Collections.emptyList()) + .anyTimes(); + + recordSupplier.close(); + expectLastCall(); + + replayAll(); + + // Start a new task + final KinesisIndexTask task2 = createTask( + task1.getId(), + new KinesisIndexTaskIOConfig( + null, + "sequence0", + new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "0" + )), + new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "6" + )), + true, + null, + null, + "awsEndpoint", + null, + null, + ImmutableSet.of(shardId1), + null, + null, + false + ) + ); + + final ListenableFuture future2 = runTask(task2); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); + + verifyAll(); + + // Check metrics + Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0); + SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0); + SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0); + SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0); + SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors()); + Assert.assertEquals( + new KinesisDataSourceMetadata( + new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "6" + ))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + } @Test(timeout = 120_000L) public void testRunWithPauseAndResume() throws Exception @@ -2426,23 +2611,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception @Test(timeout = 5000L) public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { - final List> records = ImmutableList.of( - new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), - new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) - ); + records = generateSinglePartitionRecords(stream); final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 7eee9dcf259e..41c0dfdf1348 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -20,8 +20,6 @@ package org.apache.druid.indexing.seekablestream; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -53,7 +51,6 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction; -import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -75,7 +72,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; -import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.Access; @@ -142,8 +138,8 @@ public enum Status } private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class); - private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; - private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; + static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; + static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; private final Map endOffsets; private final ConcurrentMap currOffsets = new ConcurrentHashMap<>(); @@ -210,7 +206,7 @@ public enum Status protected volatile boolean pauseRequested = false; private volatile long nextCheckpointTime; - private volatile CopyOnWriteArrayList sequences; + private volatile CopyOnWriteArrayList> sequences; private volatile Throwable backgroundThreadException; public SeekableStreamIndexTaskRunner( @@ -276,7 +272,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception Map.Entry> previous = sequenceOffsets.next(); while (sequenceOffsets.hasNext()) { Map.Entry> current = sequenceOffsets.next(); - sequences.add(new SequenceMetadata( + sequences.add(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -287,7 +283,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception previous = current; exclusive = true; } - sequences.add(new SequenceMetadata( + sequences.add(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -296,7 +292,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception exclusive ? previous.getValue().keySet() : null )); } else { - sequences.add(new SequenceMetadata( + sequences.add(new SequenceMetadata<>( 0, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), ioConfig.getStartPartitions().getPartitionSequenceNumberMap(), @@ -369,7 +365,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception } else { @SuppressWarnings("unchecked") final Map restoredMetadataMap = (Map) restoredMetadata; - final SeekableStreamPartitions restoredNextPartitions = deserializeSeekableStreamPartitionsFromMetadata( + final SeekableStreamPartitions restoredNextPartitions = deserializePartitionsFromMetadata( toolbox.getObjectMapper(), restoredMetadataMap.get(METADATA_NEXT_PARTITIONS) ); @@ -543,9 +539,9 @@ public void run() } boolean isPersistRequired = false; - final SequenceMetadata sequenceToUse = sequences + final SequenceMetadata sequenceToUse = sequences .stream() - .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) + .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record)) .findFirst() .orElse(null); @@ -692,11 +688,11 @@ public void onFailure(Throwable t) status = Status.PUBLISHING; } - for (SequenceMetadata sequenceMetadata : sequences) { + for (SequenceMetadata sequenceMetadata : sequences) { if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { // this is done to prevent checks in sequence specific commit supplier from failing sequenceMetadata.setEndOffsets(currOffsets); - sequenceMetadata.updateAssignments(currOffsets); + sequenceMetadata.updateAssignments(this, currOffsets); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue publishAndRegisterHandoff(sequenceMetadata); @@ -812,11 +808,6 @@ public void onFailure(Throwable t) return TaskStatus.success(task.getId()); } - /** - * checks if the input seqNum marks end of shard. Used by Kinesis only - */ - protected abstract boolean isEndOfShard(SequenceOffsetType seqNum); - private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException { // Check if any publishFuture failed. @@ -846,14 +837,14 @@ private void checkPublishAndHandoffFailure() throws ExecutionException, Interrup handOffWaitList.removeAll(handoffFinished); } - private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) + private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) { log.info("Publishing segments for sequence [%s]", sequenceMetadata); final ListenableFuture publishFuture = Futures.transform( driver.publish( - sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), - sequenceMetadata.getCommitterSupplier(stream, lastPersistedOffsets).get(), + sequenceMetadata.createPublisher(this, toolbox, ioConfig.isUseTransaction()), + sequenceMetadata.getCommitterSupplier(this, stream, lastPersistedOffsets).get(), Collections.singletonList(sequenceMetadata.getSequenceName()) ), (Function) publishedSegmentsAndMetadata -> { @@ -938,11 +929,9 @@ private boolean restoreSequences() throws IOException final File sequencesPersistFile = getSequencesPersistFile(toolbox); if (sequencesPersistFile.exists()) { sequences = new CopyOnWriteArrayList<>( - toolbox.getObjectMapper().>readValue( + toolbox.getObjectMapper().>>readValue( sequencesPersistFile, - new TypeReference>() - { - } + getSequenceMetadataTypeReference() ) ); return true; @@ -955,9 +944,7 @@ private synchronized void persistSequences() throws IOException { log.info("Persisting Sequences Metadata [%s]", sequences); toolbox.getObjectMapper().writerWithType( - new TypeReference>() - { - } + getSequenceMetadataTypeReference() ).writeValue(getSequencesPersistFile(toolbox), sequences); } @@ -1002,8 +989,8 @@ private Map getTaskCompletionRowStats() private void maybePersistAndPublishSequences(Supplier committerSupplier) throws InterruptedException { - for (SequenceMetadata sequenceMetadata : sequences) { - sequenceMetadata.updateAssignments(currOffsets); + for (SequenceMetadata sequenceMetadata : sequences) { + sequenceMetadata.updateAssignments(this, currOffsets); if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { publishingSequences.add(sequenceMetadata.getSequenceName()); try { @@ -1378,7 +1365,7 @@ public Response setEndOffsets( // and after acquiring pauseLock to correctly guard against duplicate requests Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences"); - final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); + final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); // if a partition has not been read yet (contained in initialOffsetsSnapshot), then // do not mark the starting sequence number as exclusive Set exclusivePartitions = sequenceNumbers.keySet() @@ -1389,7 +1376,7 @@ public Response setEndOffsets( .collect(Collectors.toSet()); if ((latestSequence.getStartOffsets().equals(sequenceNumbers) - && latestSequence.exclusiveStartPartitions.equals(exclusivePartitions) + && latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); @@ -1409,8 +1396,7 @@ public Response setEndOffsets( } for (Map.Entry entry : sequenceNumbers.entrySet()) { - if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) - < 0) { + if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) < 0) { return Response.status(Response.Status.BAD_REQUEST) .entity( StringUtils.format( @@ -1433,7 +1419,7 @@ public Response setEndOffsets( exclusiveStartingPartitions.addAll(exclusivePartitions); // create new sequence - final SequenceMetadata newSequence = new SequenceMetadata( + final SequenceMetadata newSequence = new SequenceMetadata<>( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), sequenceNumbers, @@ -1596,291 +1582,6 @@ public DateTime getStartTime(@Context final HttpServletRequest req) return startTime; } - private class SequenceMetadata - { - private final int sequenceId; - private final String sequenceName; - private final Set exclusiveStartPartitions; - private final Set assignments; - private final boolean sentinel; - private boolean checkpointed; - /** - * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because - * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. - */ - private final ReentrantLock lock = new ReentrantLock(); - - final Map startOffsets; - final Map endOffsets; - - @JsonCreator - public SequenceMetadata( - @JsonProperty("sequenceId") int sequenceId, - @JsonProperty("sequenceName") String sequenceName, - @JsonProperty("startOffsets") Map startOffsets, - @JsonProperty("endOffsets") Map endOffsets, - @JsonProperty("checkpointed") boolean checkpointed, - @JsonProperty("exclusiveStartPartitions") Set exclusiveStartPartitions - ) - { - Preconditions.checkNotNull(sequenceName); - Preconditions.checkNotNull(startOffsets); - Preconditions.checkNotNull(endOffsets); - this.sequenceId = sequenceId; - this.sequenceName = sequenceName; - this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = new HashMap<>(endOffsets); - this.assignments = new HashSet<>(startOffsets.keySet()); - this.checkpointed = checkpointed; - this.sentinel = false; - this.exclusiveStartPartitions = exclusiveStartPartitions == null - ? Collections.emptySet() - : exclusiveStartPartitions; - } - - @JsonProperty - public Set getExclusiveStartPartitions() - { - return exclusiveStartPartitions; - } - - @JsonProperty - public int getSequenceId() - { - return sequenceId; - } - - @JsonProperty - public boolean isCheckpointed() - { - lock.lock(); - try { - return checkpointed; - } - finally { - lock.unlock(); - } - } - - @JsonProperty - public String getSequenceName() - { - return sequenceName; - } - - @JsonProperty - public Map getStartOffsets() - { - return startOffsets; - } - - @JsonProperty - public Map getEndOffsets() - { - lock.lock(); - try { - return endOffsets; - } - finally { - lock.unlock(); - } - } - - @JsonProperty - public boolean isSentinel() - { - return sentinel; - } - - void setEndOffsets(Map newEndOffsets) - { - lock.lock(); - try { - endOffsets.putAll(newEndOffsets); - checkpointed = true; - } - finally { - lock.unlock(); - } - } - - void updateAssignments(Map nextPartitionOffset) - { - lock.lock(); - try { - assignments.clear(); - nextPartitionOffset.forEach((key, value) -> { - if (endOffsets.get(key).equals(SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER) - || createSequenceNumber(endOffsets.get(key)).compareTo(createSequenceNumber(nextPartitionOffset.get(key))) - > 0) { - assignments.add(key); - } - }); - } - finally { - lock.unlock(); - } - } - - boolean isOpen() - { - return !assignments.isEmpty(); - } - - boolean canHandle(OrderedPartitionableRecord record) - { - lock.lock(); - try { - final OrderedSequenceNumber partitionEndOffset = createSequenceNumber(endOffsets.get(record.getPartitionId())); - final OrderedSequenceNumber partitionStartOffset = createSequenceNumber(startOffsets.get( - record.getPartitionId())); - final OrderedSequenceNumber recordOffset = createSequenceNumber(record.getSequenceNumber()); - if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) { - return false; - } - boolean ret; - if (isStartingSequenceOffsetsExclusive()) { - ret = recordOffset.compareTo(partitionStartOffset) - >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0); - } else { - ret = recordOffset.compareTo(partitionStartOffset) >= 0; - } - - if (isEndSequenceOffsetsExclusive()) { - ret &= recordOffset.compareTo(partitionEndOffset) < 0; - } else { - ret &= recordOffset.compareTo(partitionEndOffset) <= 0; - } - - return ret; - } - finally { - lock.unlock(); - } - } - - @Override - public String toString() - { - lock.lock(); - try { - return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + - ", assignments=" + assignments + - ", sentinel=" + sentinel + - ", checkpointed=" + checkpointed + - '}'; - } - finally { - lock.unlock(); - } - } - - Supplier getCommitterSupplier( - String stream, - Map lastPersistedOffsets - ) - { - // Set up committer. - return () -> - new Committer() - { - @Override - public Object getMetadata() - { - lock.lock(); - - try { - Preconditions.checkState( - assignments.isEmpty(), - "This committer can be used only once all the records till sequences [%s] have been consumed, also make" - + " sure to call updateAssignments before using this committer", - endOffsets - ); - - - // merge endOffsets for this sequence with globally lastPersistedOffsets - // This is done because this committer would be persisting only sub set of segments - // corresponding to the current sequence. Generally, lastPersistedOffsets should already - // cover endOffsets but just to be sure take max of sequences and persist that - for (Map.Entry partitionOffset : endOffsets.entrySet()) { - SequenceOffsetType newOffsets = partitionOffset.getValue(); - if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) && - createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo( - createSequenceNumber(newOffsets)) > 0) { - newOffsets = lastPersistedOffsets.get(partitionOffset.getKey()); - } - lastPersistedOffsets.put( - partitionOffset.getKey(), - newOffsets - ); - } - - // Publish metadata can be different from persist metadata as we are going to publish only - // subset of segments - return ImmutableMap.of( - METADATA_NEXT_PARTITIONS, new SeekableStreamPartitions<>(stream, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new SeekableStreamPartitions<>(stream, endOffsets) - ); - } - finally { - lock.unlock(); - } - } - - @Override - public void run() - { - // Do nothing. - } - }; - - } - - TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction) - { - return (segments, commitMetadata) -> { - final SeekableStreamPartitions finalPartitions = deserializeSeekableStreamPartitionsFromMetadata( - toolbox.getObjectMapper(), - ((Map) Preconditions - .checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS) - ); - - // Sanity check, we should only be publishing things that match our desired end state. - if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) { - throw new ISE( - "WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", - toString(), - commitMetadata - ); - } - - final SegmentTransactionalInsertAction action; - - if (useTransaction) { - action = new SegmentTransactionalInsertAction( - segments, - createDataSourceMetadata(new SeekableStreamPartitions<>( - finalPartitions.getStream(), - getStartOffsets() - )), - createDataSourceMetadata(finalPartitions) - ); - } else { - action = new SegmentTransactionalInsertAction(segments, null, null); - } - - log.info("Publishing with isTransaction[%s].", useTransaction); - - return toolbox.getTaskActionClient().submit(action); - }; - } - - } - private boolean verifyInitialRecordAndSkipExclusivePartition( final OrderedPartitionableRecord record ) @@ -1920,7 +1621,12 @@ private boolean verifyInitialRecordAndSkipExclusivePartition( } /** - * deserailizes the checkpoints into of Map> + * checks if the input seqNum marks end of shard. Used by Kinesis only + */ + protected abstract boolean isEndOfShard(SequenceOffsetType seqNum); + + /** + * deserializes the checkpoints into of Map> * * @param toolbox task toolbox * @param checkpointsString the json-serialized checkpoint string @@ -1936,7 +1642,7 @@ protected abstract TreeMap> ge ) throws IOException; /** - * Calculates the sequence number used to update `currentOffsets` after finishing reading a record. + * Calculates the sequence number used to update `currentOffsets` after finished reading a record. * In Kafka this returns sequenceNumeber + 1 since that's the next expected offset * In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not * contiguous and finding the next sequence number requires an expensive API call @@ -1948,14 +1654,14 @@ protected abstract TreeMap> ge protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber); /** - * deserialzies stored metadata into SeekableStreamPartitions + * deserializes stored metadata into SeekableStreamPartitions * * @param mapper json objectMapper * @param object metadata * * @return SeekableStreamPartitions */ - protected abstract SeekableStreamPartitions deserializeSeekableStreamPartitionsFromMetadata( + protected abstract SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, Object object ); @@ -2025,4 +1731,6 @@ protected abstract void possiblyResetDataSourceMetadata( * partition we read from stream */ protected abstract boolean isStartingSequenceOffsetsExclusive(); + + protected abstract TypeReference>> getSequenceMetadataTypeReference(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java new file mode 100644 index 000000000000..7fbc800ac3d1 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.Committer; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +public class SequenceMetadata +{ + private final int sequenceId; + private final String sequenceName; + private final Set exclusiveStartPartitions; + private final Set assignments; + private final boolean sentinel; + private boolean checkpointed; + /** + * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because + * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. + */ + private final ReentrantLock lock = new ReentrantLock(); + + final Map startOffsets; + final Map endOffsets; + + @JsonCreator + public SequenceMetadata( + @JsonProperty("sequenceId") int sequenceId, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("startOffsets") Map startOffsets, + @JsonProperty("endOffsets") Map endOffsets, + @JsonProperty("checkpointed") boolean checkpointed, + @JsonProperty("exclusiveStartPartitions") Set exclusiveStartPartitions + ) + { + Preconditions.checkNotNull(sequenceName); + Preconditions.checkNotNull(startOffsets); + Preconditions.checkNotNull(endOffsets); + this.sequenceId = sequenceId; + this.sequenceName = sequenceName; + this.startOffsets = ImmutableMap.copyOf(startOffsets); + this.endOffsets = new HashMap<>(endOffsets); + this.assignments = new HashSet<>(startOffsets.keySet()); + this.checkpointed = checkpointed; + this.sentinel = false; + this.exclusiveStartPartitions = exclusiveStartPartitions == null + ? Collections.emptySet() + : exclusiveStartPartitions; + } + + @JsonProperty + public Set getExclusiveStartPartitions() + { + return exclusiveStartPartitions; + } + + @JsonProperty + public int getSequenceId() + { + return sequenceId; + } + + @JsonProperty + public boolean isCheckpointed() + { + lock.lock(); + try { + return checkpointed; + } + finally { + lock.unlock(); + } + } + + @JsonProperty + public String getSequenceName() + { + return sequenceName; + } + + @JsonProperty + public Map getStartOffsets() + { + return startOffsets; + } + + @JsonProperty + public Map getEndOffsets() + { + lock.lock(); + try { + return endOffsets; + } + finally { + lock.unlock(); + } + } + + @JsonProperty + public boolean isSentinel() + { + return sentinel; + } + + void setEndOffsets(Map newEndOffsets) + { + lock.lock(); + try { + endOffsets.putAll(newEndOffsets); + checkpointed = true; + } + finally { + lock.unlock(); + } + } + + void updateAssignments( + SeekableStreamIndexTaskRunner runner, + Map nextPartitionOffset + ) + { + lock.lock(); + try { + assignments.clear(); + nextPartitionOffset.forEach((key, value) -> { + SequenceOffsetType endOffset = endOffsets.get(key); + if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset) + || runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key))) > 0) { + assignments.add(key); + } + }); + } + finally { + lock.unlock(); + } + } + + boolean isOpen() + { + return !assignments.isEmpty(); + } + + boolean canHandle( + SeekableStreamIndexTaskRunner runner, + OrderedPartitionableRecord record + ) + { + lock.lock(); + try { + final OrderedSequenceNumber partitionEndOffset = runner.createSequenceNumber(endOffsets.get(record.getPartitionId())); + final OrderedSequenceNumber partitionStartOffset = runner.createSequenceNumber(startOffsets.get( + record.getPartitionId())); + final OrderedSequenceNumber recordOffset = runner.createSequenceNumber(record.getSequenceNumber()); + if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) { + return false; + } + boolean ret; + if (runner.isStartingSequenceOffsetsExclusive()) { + ret = recordOffset.compareTo(partitionStartOffset) + >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0); + } else { + ret = recordOffset.compareTo(partitionStartOffset) >= 0; + } + + if (runner.isEndSequenceOffsetsExclusive()) { + ret &= recordOffset.compareTo(partitionEndOffset) < 0; + } else { + ret &= recordOffset.compareTo(partitionEndOffset) <= 0; + } + + return ret; + } + finally { + lock.unlock(); + } + } + + @Override + public String toString() + { + lock.lock(); + try { + return "SequenceMetadata{" + + "sequenceName='" + sequenceName + '\'' + + ", sequenceId=" + sequenceId + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", assignments=" + assignments + + ", sentinel=" + sentinel + + ", checkpointed=" + checkpointed + + '}'; + } + finally { + lock.unlock(); + } + } + + Supplier getCommitterSupplier( + SeekableStreamIndexTaskRunner runner, + String stream, + Map lastPersistedOffsets + ) + { + // Set up committer. + return () -> + new Committer() + { + @Override + public Object getMetadata() + { + lock.lock(); + + try { + Preconditions.checkState( + assignments.isEmpty(), + "This committer can be used only once all the records till sequences [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", + endOffsets + ); + + + // merge endOffsets for this sequence with globally lastPersistedOffsets + // This is done because this committer would be persisting only sub set of segments + // corresponding to the current sequence. Generally, lastPersistedOffsets should already + // cover endOffsets but just to be sure take max of sequences and persist that + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + SequenceOffsetType newOffsets = partitionOffset.getValue(); + if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) + && runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())) + .compareTo(runner.createSequenceNumber(newOffsets)) > 0) { + newOffsets = lastPersistedOffsets.get(partitionOffset.getKey()); + } + lastPersistedOffsets.put( + partitionOffset.getKey(), + newOffsets + ); + } + + // Publish metadata can be different from persist metadata as we are going to publish only + // subset of segments + return ImmutableMap.of( + SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS, + new SeekableStreamPartitions<>(stream, lastPersistedOffsets), + SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS, + new SeekableStreamPartitions<>(stream, endOffsets) + ); + } + finally { + lock.unlock(); + } + } + + @Override + public void run() + { + // Do nothing. + } + }; + + } + + TransactionalSegmentPublisher createPublisher( + SeekableStreamIndexTaskRunner runner, + TaskToolbox toolbox, + boolean useTransaction + ) + { + return (segments, commitMetadata) -> { + final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata"); + final SeekableStreamPartitions finalPartitions = + runner.deserializePartitionsFromMetadata( + toolbox.getObjectMapper(), + commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS) + ); + + // Sanity check, we should only be publishing things that match our desired end state. + if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) { + throw new ISE( + "WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].", + toString(), + commitMetadata + ); + } + + final SegmentTransactionalInsertAction action; + + if (useTransaction) { + action = new SegmentTransactionalInsertAction( + segments, + runner.createDataSourceMetadata( + new SeekableStreamPartitions<>(finalPartitions.getStream(), getStartOffsets()) + ), + runner.createDataSourceMetadata(finalPartitions) + ); + } else { + action = new SegmentTransactionalInsertAction(segments, null, null); + } + + return toolbox.getTaskActionClient().submit(action); + }; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 25250ac0487c..b93d15f44418 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -119,8 +119,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor - implements Supervisor +public abstract class SeekableStreamSupervisor implements Supervisor { public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";