diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index b4c8de284cfa..e0d7f73c689d 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -116,7 +116,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |-----|----|-----------|--------| |`type`|String|The indexing task type, this should always be `kafka`.|yes| |`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| -|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows.|no (default == 5000000)| +|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| @@ -130,6 +130,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| |`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)| |`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)| +|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)| #### IndexSpec @@ -312,12 +313,12 @@ In this way, configuration changes can be applied without requiring any pause in ### On the Subject of Segments Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment -granular interval until maxRowsPerSegment limit is reached, at this point a new partition for this segment granularity is -created for further events. Kafka Indexing Task also does incremental hand-offs which means that all the segments created by a -task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held -by the task at that point in time will be handed-off and new set of segments will be created for further events. -This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager -nodes and it is encouraged to do so. +granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition +for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which +means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment +or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off +and new set of segments will be created for further events. This means that the task can run for longer durations of time +without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so. Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 081f0f819627..f4bfeb1079f0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -179,7 +179,6 @@ public enum Status private final Map endOffsets = new ConcurrentHashMap<>(); private final Map nextOffsets = new ConcurrentHashMap<>(); - private final Map maxEndOffsets = new HashMap<>(); private final Map lastPersistedOffsets = new ConcurrentHashMap<>(); private TaskToolbox toolbox; @@ -231,6 +230,7 @@ public enum Status private volatile boolean pauseRequested = false; private volatile long pauseMillis = 0; + private volatile long nextCheckpointTime; // This value can be tuned in some tests private long pollRetryMs = 30000; @@ -273,12 +273,6 @@ public KafkaIndexTask( this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.authorizerMapper = authorizerMapper; this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); - this.maxEndOffsets.putAll(endOffsets.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - integerLongEntry -> Long.MAX_VALUE - ))); this.topic = ioConfig.getStartPartitions().getTopic(); this.sequences = new CopyOnWriteArrayList<>(); @@ -288,6 +282,12 @@ public KafkaIndexTask( } else { useLegacy = true; } + resetNextCheckpointTime(); + } + + private void resetNextCheckpointTime() + { + nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis(); } @VisibleForTesting @@ -444,7 +444,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), - maxEndOffsets, + endOffsets, false )); } else { @@ -452,7 +452,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception 0, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), ioConfig.getStartPartitions().getPartitionOffsetMap(), - maxEndOffsets, + endOffsets, false )); } @@ -775,7 +775,11 @@ public void onFailure(Throwable t) } } - if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) { + if (System.currentTimeMillis() > nextCheckpointTime) { + sequenceToCheckpoint = sequences.get(sequences.size() - 1); + } + + if (sequenceToCheckpoint != null && stillReading) { Preconditions.checkArgument( sequences.get(sequences.size() - 1) .getSequenceName() @@ -1547,6 +1551,7 @@ public Response setEndOffsets( } } + resetNextCheckpointTime(); latestSequence.setEndOffsets(offsets); if (finish) { @@ -1559,7 +1564,7 @@ public Response setEndOffsets( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), offsets, - maxEndOffsets, + endOffsets, false ); sequences.add(newSequence); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 5210028ded3b..21ec1ed9f1ca 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -49,6 +49,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final boolean resetOffsetAutomatically; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final Period intermediateHandoffPeriod; @JsonCreator public KafkaTuningConfig( @@ -63,7 +64,8 @@ public KafkaTuningConfig( @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -87,6 +89,9 @@ public KafkaTuningConfig( ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + this.intermediateHandoffPeriod = intermediateHandoffPeriod == null + ? new Period().withDays(Integer.MAX_VALUE) + : intermediateHandoffPeriod; } public static KafkaTuningConfig copyOf(KafkaTuningConfig config) @@ -102,7 +107,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) config.reportParseExceptions, config.handoffConditionTimeout, config.resetOffsetAutomatically, - config.segmentWriteOutMediumFactory + config.segmentWriteOutMediumFactory, + config.intermediateHandoffPeriod ); } @@ -185,6 +191,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() return segmentWriteOutMediumFactory; } + @JsonProperty + public Period getIntermediateHandoffPeriod() + { + return intermediateHandoffPeriod; + } + public KafkaTuningConfig withBasePersistDirectory(File dir) { return new KafkaTuningConfig( @@ -198,7 +210,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); } @@ -221,7 +234,8 @@ public boolean equals(Object o) Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && + Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @Override @@ -237,7 +251,8 @@ public int hashCode() reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); } @@ -255,6 +270,7 @@ public String toString() ", handoffConditionTimeout=" + handoffConditionTimeout + ", resetOffsetAutomatically=" + resetOffsetAutomatically + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", intermediateHandoffPeriod=" + intermediateHandoffPeriod + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 2b540e537104..ae67994aed60 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -89,6 +89,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 8d043877c38c..c6e5a3fe8d65 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -56,7 +56,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout, @JsonProperty("shutdownTimeout") Period shutdownTimeout, - @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, + @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod ) { super( @@ -70,7 +71,8 @@ public KafkaSupervisorTuningConfig( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); this.workerThreads = workerThreads; @@ -137,6 +139,7 @@ public String toString() ", httpTimeout=" + httpTimeout + ", shutdownTimeout=" + shutdownTimeout + ", offsetFetchPeriod=" + offsetFetchPeriod + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 9045ba97f99d..dfc20525513b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -186,6 +186,7 @@ public class KafkaIndexTaskTest private boolean resetOffsetAutomatically = false; private boolean doHandoff = true; private Integer maxRowsPerSegment = null; + private Period intermediateHandoffPeriod = null; private TaskToolboxFactory toolboxFactory; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; @@ -524,6 +525,84 @@ public void testIncrementalHandOff() throws Exception Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } + @Test(timeout = 60_000L) + public void testTimeBasedIncrementalHandOff() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen + maxRowsPerSegment = Integer.MAX_VALUE; + intermediateHandoffPeriod = new Period().withSeconds(0); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records.subList(0, 2)) { + kafkaProducer.send(record).get(); + } + } + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); + // Checkpointing will happen at checkpoint + final KafkaPartitions checkpoint = new KafkaPartitions(topic, ImmutableMap.of(0, 1L, 1, 0L)); + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)); + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + true, + false, + null, + null, + false + ) + ); + final ListenableFuture future = runTask(task); + + // task will pause for checkpointing + while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets()); + Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets)); + task.setEndOffsets(currentOffsets, true, false); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + Assert.assertEquals(1, checkpointRequestsHash.size()); + Assert.assertTrue(checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + baseSequenceName, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) + ) + )); + + // Check metrics + Assert.assertEquals(2, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + } + @Test(timeout = 60_000L) public void testRunWithMinimumMessageTime() throws Exception { @@ -1708,7 +1787,8 @@ private KafkaIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - null + null, + intermediateHandoffPeriod ); final Map context = isIncrementalHandoffSupported ? ImmutableMap.of(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true) @@ -1746,6 +1826,7 @@ private KafkaIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, + null, null ); if (isIncrementalHandoffSupported) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 386664e4c0dc..5378e4fc2464 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -112,6 +112,7 @@ public void testCopyOf() throws Exception true, 5L, null, + null, null ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a53d6758f8ab..8364153b1818 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -200,6 +200,7 @@ public void setupTest() throws Exception TEST_CHAT_RETRIES, TEST_HTTP_TIMEOUT, TEST_SHUTDOWN_TIMEOUT, + null, null );