From 6c0b39054fca9a483e8229fd3675be69527247fd Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 5 Jan 2018 17:11:41 -0600 Subject: [PATCH 1/5] time based checkpointing --- .../extensions-core/kafka-ingestion.md | 3 ++- .../druid/indexing/kafka/KafkaIndexTask.java | 12 ++++++++++ .../indexing/kafka/KafkaTuningConfig.java | 24 +++++++++++++++---- .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 7 ++++-- .../indexing/kafka/KafkaIndexTaskTest.java | 2 ++ .../indexing/kafka/KafkaTuningConfigTest.java | 1 + .../kafka/supervisor/KafkaSupervisorTest.java | 1 + 8 files changed, 43 insertions(+), 8 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index b4c8de284cfa..9f341e7751c9 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -116,7 +116,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |-----|----|-----------|--------| |`type`|String|The indexing task type, this should always be `kafka`.|yes| |`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| -|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows.|no (default == 5000000)| +|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| @@ -130,6 +130,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| |`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)| |`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)| +|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)| #### IndexSpec diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 081f0f819627..d0f3bf34fef1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -231,6 +231,7 @@ public enum Status private volatile boolean pauseRequested = false; private volatile long pauseMillis = 0; + private volatile long nextCheckpointTime; // This value can be tuned in some tests private long pollRetryMs = 30000; @@ -288,6 +289,12 @@ public KafkaIndexTask( } else { useLegacy = true; } + resetNextCheckpointTime(); + } + + private void resetNextCheckpointTime() + { + nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis(); } @VisibleForTesting @@ -775,6 +782,10 @@ public void onFailure(Throwable t) } } + if (System.currentTimeMillis() > nextCheckpointTime) { + sequenceToCheckpoint = sequences.get(sequences.size() - 1); + } + if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) { Preconditions.checkArgument( sequences.get(sequences.size() - 1) @@ -1547,6 +1558,7 @@ public Response setEndOffsets( } } + resetNextCheckpointTime(); latestSequence.setEndOffsets(offsets); if (finish) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 5210028ded3b..62c4f048cc53 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -49,6 +49,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final boolean resetOffsetAutomatically; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final Period intermediateHandoffPeriod; @JsonCreator public KafkaTuningConfig( @@ -63,7 +64,8 @@ public KafkaTuningConfig( @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -87,6 +89,7 @@ public KafkaTuningConfig( ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + this.intermediateHandoffPeriod = intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod; } public static KafkaTuningConfig copyOf(KafkaTuningConfig config) @@ -102,7 +105,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) config.reportParseExceptions, config.handoffConditionTimeout, config.resetOffsetAutomatically, - config.segmentWriteOutMediumFactory + config.segmentWriteOutMediumFactory, + config.intermediateHandoffPeriod ); } @@ -185,6 +189,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() return segmentWriteOutMediumFactory; } + @JsonProperty + public Period getIntermediateHandoffPeriod() + { + return intermediateHandoffPeriod; + } + public KafkaTuningConfig withBasePersistDirectory(File dir) { return new KafkaTuningConfig( @@ -198,7 +208,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); } @@ -221,7 +232,8 @@ public boolean equals(Object o) Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && + Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @Override @@ -237,7 +249,8 @@ public int hashCode() reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); } @@ -255,6 +268,7 @@ public String toString() ", handoffConditionTimeout=" + handoffConditionTimeout + ", resetOffsetAutomatically=" + resetOffsetAutomatically + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", intermediateHandoffPeriod=" + intermediateHandoffPeriod + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 2b540e537104..ae67994aed60 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -89,6 +89,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 8d043877c38c..c6e5a3fe8d65 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -56,7 +56,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout, @JsonProperty("shutdownTimeout") Period shutdownTimeout, - @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, + @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod ) { super( @@ -70,7 +71,8 @@ public KafkaSupervisorTuningConfig( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + intermediateHandoffPeriod ); this.workerThreads = workerThreads; @@ -137,6 +139,7 @@ public String toString() ", httpTimeout=" + httpTimeout + ", shutdownTimeout=" + shutdownTimeout + ", offsetFetchPeriod=" + offsetFetchPeriod + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 9045ba97f99d..398cd8c6c8c5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1708,6 +1708,7 @@ private KafkaIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, + null, null ); final Map context = isIncrementalHandoffSupported @@ -1746,6 +1747,7 @@ private KafkaIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, + null, null ); if (isIncrementalHandoffSupported) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 386664e4c0dc..5378e4fc2464 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -112,6 +112,7 @@ public void testCopyOf() throws Exception true, 5L, null, + null, null ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a53d6758f8ab..8364153b1818 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -200,6 +200,7 @@ public void setupTest() throws Exception TEST_CHAT_RETRIES, TEST_HTTP_TIMEOUT, TEST_SHUTDOWN_TIMEOUT, + null, null ); From e07d6312cdc5d4b3de40aa84d7b25edef74d2a93 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Fri, 26 Jan 2018 12:39:11 -0600 Subject: [PATCH 2/5] add test and fix issue --- .../druid/indexing/kafka/KafkaIndexTask.java | 15 +--- .../indexing/kafka/KafkaIndexTaskTest.java | 81 ++++++++++++++++++- 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d0f3bf34fef1..f4bfeb1079f0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -179,7 +179,6 @@ public enum Status private final Map endOffsets = new ConcurrentHashMap<>(); private final Map nextOffsets = new ConcurrentHashMap<>(); - private final Map maxEndOffsets = new HashMap<>(); private final Map lastPersistedOffsets = new ConcurrentHashMap<>(); private TaskToolbox toolbox; @@ -274,12 +273,6 @@ public KafkaIndexTask( this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.authorizerMapper = authorizerMapper; this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); - this.maxEndOffsets.putAll(endOffsets.entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - integerLongEntry -> Long.MAX_VALUE - ))); this.topic = ioConfig.getStartPartitions().getTopic(); this.sequences = new CopyOnWriteArrayList<>(); @@ -451,7 +444,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), - maxEndOffsets, + endOffsets, false )); } else { @@ -459,7 +452,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception 0, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), ioConfig.getStartPartitions().getPartitionOffsetMap(), - maxEndOffsets, + endOffsets, false )); } @@ -786,7 +779,7 @@ public void onFailure(Throwable t) sequenceToCheckpoint = sequences.get(sequences.size() - 1); } - if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) { + if (sequenceToCheckpoint != null && stillReading) { Preconditions.checkArgument( sequences.get(sequences.size() - 1) .getSequenceName() @@ -1571,7 +1564,7 @@ public Response setEndOffsets( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), offsets, - maxEndOffsets, + endOffsets, false ); sequences.add(newSequence); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 398cd8c6c8c5..6f12b4b17312 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -186,6 +186,7 @@ public class KafkaIndexTaskTest private boolean resetOffsetAutomatically = false; private boolean doHandoff = true; private Integer maxRowsPerSegment = null; + private Period intermediateHandoffPeriod = null; private TaskToolboxFactory toolboxFactory; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; @@ -524,6 +525,84 @@ public void testIncrementalHandOff() throws Exception Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } + @Test(timeout = 60_000L) + public void testTimeBasedIncrementalHandOff() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen + maxRowsPerSegment = Integer.MAX_VALUE; + intermediateHandoffPeriod = new Period().withSeconds(0); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records.subList(0, 2)) { + kafkaProducer.send(record).get(); + } + } + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); + // Checkpointing will happen at either checkpoint1 or checkpoint2 depending on ordering + // of events fetched across two partitions from Kafka + final KafkaPartitions checkpoint = new KafkaPartitions(topic, ImmutableMap.of(0, 1L, 1, 0L)); + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)); + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + true, + false, + null, + null, + false + ) + ); + final ListenableFuture future = runTask(task); + + while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets()); + Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets)); + task.setEndOffsets(currentOffsets, true, false); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + Assert.assertEquals(1, checkpointRequestsHash.size()); + Assert.assertTrue(checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + baseSequenceName, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) + )); + + // Check metrics + Assert.assertEquals(2, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + } + @Test(timeout = 60_000L) public void testRunWithMinimumMessageTime() throws Exception { @@ -1709,7 +1788,7 @@ private KafkaIndexTask createTask( handoffConditionTimeout, resetOffsetAutomatically, null, - null + intermediateHandoffPeriod ); final Map context = isIncrementalHandoffSupported ? ImmutableMap.of(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true) From 049f91bfe729ae464dc0c79fe7dc2157ce79c4be Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 30 Jan 2018 10:31:17 -0600 Subject: [PATCH 3/5] fix comments --- .../java/io/druid/indexing/kafka/KafkaIndexTaskTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 6f12b4b17312..dfc20525513b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -546,8 +546,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception consumerProps.put("max.poll.records", "1"); final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); - // Checkpointing will happen at either checkpoint1 or checkpoint2 depending on ordering - // of events fetched across two partitions from Kafka + // Checkpointing will happen at checkpoint final KafkaPartitions checkpoint = new KafkaPartitions(topic, ImmutableMap.of(0, 1L, 1, 0L)); final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)); final KafkaIndexTask task = createTask( @@ -566,6 +565,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception ); final ListenableFuture future = runTask(task); + // task will pause for checkpointing while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { Thread.sleep(10); } @@ -580,7 +580,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception DATA_SCHEMA.getDataSource(), baseSequenceName, new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) ) )); From a4798b982aa88aea5afab9f373b9b8128da42d88 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 30 Jan 2018 16:22:37 -0600 Subject: [PATCH 4/5] fix formatting --- .../main/java/io/druid/indexing/kafka/KafkaTuningConfig.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 62c4f048cc53..21ec1ed9f1ca 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -89,7 +89,9 @@ public KafkaTuningConfig( ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; - this.intermediateHandoffPeriod = intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod; + this.intermediateHandoffPeriod = intermediateHandoffPeriod == null + ? new Period().withDays(Integer.MAX_VALUE) + : intermediateHandoffPeriod; } public static KafkaTuningConfig copyOf(KafkaTuningConfig config) From 5bffc894d602010c52d9afdd67d469e01c365b1f Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 15 Feb 2018 19:52:34 -0600 Subject: [PATCH 5/5] update docs --- .../development/extensions-core/kafka-ingestion.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 9f341e7751c9..e0d7f73c689d 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -313,12 +313,12 @@ In this way, configuration changes can be applied without requiring any pause in ### On the Subject of Segments Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment -granular interval until maxRowsPerSegment limit is reached, at this point a new partition for this segment granularity is -created for further events. Kafka Indexing Task also does incremental hand-offs which means that all the segments created by a -task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held -by the task at that point in time will be handed-off and new set of segments will be created for further events. -This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager -nodes and it is encouraged to do so. +granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition +for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which +means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment +or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off +and new set of segments will be created for further events. This means that the task can run for longer durations of time +without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so. Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and