diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index e0fcc2d00730..c748c123ee03 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -154,8 +154,9 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false | | `maxParseExceptions` | Integer | The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set. | no, unlimited default | | `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 | -| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 | -| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) | +| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 | +| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) | +| `enableTimeLagMetrics` | Boolean | If true, shard time lag metrics showing how far Druid ingestion is behind the latest data in the Kinesis stream (based on the `millisBehindLatest` value returned in the Kinesis `GetRecords` result) will be emitted, and time lag information will also be included in the supervisor status reports. | no, (default == false) | #### IndexSpec diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 2dca7f6edfc5..1ea63ebfee89 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -186,6 +186,13 @@ public Duration getOffsetFetchPeriod() return offsetFetchPeriod; } + @Override + public Boolean isEnableTimeLagMetrics() + { + // Kafka ingestion doesn't support time lag metrics + return false; + } + @Override public String toString() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index f4b78304d17c..3d6b7da7e2a4 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -247,7 +247,6 @@ private Runnable getRecordRunnable() final List data; - if (deaggregate) { if (deaggregateHandle == null || getDataHandle == null) { throw new ISE("deaggregateHandle or getDataHandle is null!"); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index c789fc7f3ac7..d2b79120ef82 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -270,7 +270,11 @@ protected SeekableStreamSupervisorReportPayload createReportPayl ) { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); - Map partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets()); + boolean effectiveIncludeOffsets = includeOffsets && spec.getTuningConfig().isEnableTimeLagMetrics(); + Map partitionLag = null; + if (effectiveIncludeOffsets) { + partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets()); + } return new KinesisSupervisorReportPayload( spec.getDataSchema().getDataSource(), ioConfig.getStream(), @@ -282,8 +286,8 @@ protected SeekableStreamSupervisorReportPayload createReportPayl stateManager.getSupervisorState().getBasicState(), stateManager.getSupervisorState(), stateManager.getExceptionEvents(), - includeOffsets ? partitionLag : null, - includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null + effectiveIncludeOffsets ? partitionLag : null, + effectiveIncludeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null ); } @@ -323,7 +327,9 @@ protected void updateLatestSequenceFromStream( ) { KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; - currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + if (spec.getTuningConfig().isEnableTimeLagMetrics()) { + currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + } } @Override @@ -335,7 +341,11 @@ protected Map getPartitionRecordLag() @Override protected Map getPartitionTimeLag() { - return currentPartitionTimeLag; + if (spec.getTuningConfig().isEnableTimeLagMetrics()) { + return currentPartitionTimeLag; + } else { + return null; + } } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 8c1a5fa16701..7c7b99eaccb3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -40,6 +40,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; private final Duration offsetFetchPeriod; + private final Boolean enableTimeLagMetrics; public static KinesisSupervisorTuningConfig defaultConfig() { @@ -75,6 +76,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -111,7 +113,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, - @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, + @JsonProperty("enableTimeLagMetrics") Boolean enableTimeLagMetrics ) { super( @@ -158,6 +161,7 @@ public KinesisSupervisorTuningConfig( offsetFetchPeriod, DEFAULT_OFFSET_FETCH_PERIOD ); + this.enableTimeLagMetrics = enableTimeLagMetrics == null ? false : enableTimeLagMetrics; } @Override @@ -196,6 +200,7 @@ public Duration getShutdownTimeout() } @Override + @JsonProperty public Duration getRepartitionTransitionDuration() { return repartitionTransitionDuration; @@ -208,6 +213,13 @@ public Duration getOffsetFetchPeriod() return offsetFetchPeriod; } + @Override + @JsonProperty + public Boolean isEnableTimeLagMetrics() + { + return enableTimeLagMetrics; + } + @Override public String toString() { @@ -241,6 +253,7 @@ public String toString() ", maxRecordsPerPoll=" + getMaxRecordsPerPoll() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + + ", enableTimeLagMetrics=" + isEnableTimeLagMetrics() + '}'; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index c886065955d1..ce497fa56a99 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2034,13 +2034,8 @@ public void testRestoreAfterPersistingSequences() throws Exception .andReturn(Collections.emptyList()) .anyTimes(); - EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(null) - .anyTimes(); - replayAll(); - final KinesisIndexTask task1 = createTask( "task1", new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 15e56c032dff..0b4f2f6e1d87 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -313,6 +313,7 @@ public void testConvert() null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 94ca6bf693b0..688b75aa98ad 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -199,7 +199,8 @@ public void setupTest() null, null, null, - null + null, + true ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); serviceEmitter = new ExceptionCapturingServiceEmitter(); @@ -281,6 +282,23 @@ public void testNoInitialState() throws Exception KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER, taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0) ); + + SupervisorReport report = supervisor.getStatus(); + Assert.assertEquals(DATASOURCE, report.getId()); + + KinesisSupervisorReportPayload payload = report.getPayload(); + Assert.assertEquals(DATASOURCE, payload.getDataSource()); + Assert.assertEquals(3600L, payload.getDurationSeconds()); + Assert.assertEquals(2, payload.getPartitions()); + Assert.assertEquals(1, payload.getReplicas()); + Assert.assertEquals(STREAM, payload.getStream()); + Assert.assertEquals(0, payload.getActiveTasks().size()); + Assert.assertEquals(0, payload.getPublishingTasks().size()); + Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); + Assert.assertEquals(0, payload.getRecentErrors().size()); + + Assert.assertEquals(TIME_LAG, payload.getMinimumLagMillis()); + Assert.assertEquals(10234L, (long) payload.getAggregateLagMillis()); } @Test @@ -1657,13 +1675,40 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() @Test public void testDiscoverExistingPublishingAndReadingTask() throws Exception + { + testDiscoverExistingPublishingAndReadingTaskBase(true); + } + + @Test + public void testDiscoverExistingPublishingAndReadingTaskWithoutTimeLagMetrics() throws Exception + { + testDiscoverExistingPublishingAndReadingTaskBase(false); + } + + /** + * This test helper covers the three areas where time lag metrics are handled: + * - Emission of the metric + * - Inclusion of time lag in supervisor aggregate report + * - Inclusion of time lag in active task report + */ + private void testDiscoverExistingPublishingAndReadingTaskBase(boolean enableTimeLagMetrics) throws Exception { final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - final Map timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L); + final Map timeLag = enableTimeLagMetrics ? ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L) : null; - supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + supervisor = getTestableSupervisor( + 1, + 1, + true, + false, + "PT1H", + null, + null, + false, + enableTimeLagMetrics + ); supervisorRecordSupplier.assign(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); @@ -1680,9 +1725,13 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(timeLag) - .atLeastOnce(); + + if (enableTimeLagMetrics) { + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(timeLag) + .atLeastOnce(); + } + Task id1 = createKinesisIndexTask( "id1", DATASOURCE, @@ -1808,6 +1857,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception TaskReportData activeReport = payload.getActiveTasks().get(0); TaskReportData publishingReport = payload.getPublishingTasks().get(0); + Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); + Assert.assertEquals(enableTimeLagMetrics ? 300L : null, payload.getAggregateLagMillis()); + Assert.assertEquals("id2", activeReport.getId()); Assert.assertEquals(startTime, activeReport.getStartTime()); Assert.assertEquals(ImmutableMap.of( @@ -2511,7 +2563,7 @@ public void testResetNoDataSourceMetadata() @Test public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception { - supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false); + supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false, true); supervisorRecordSupplier.assign(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); @@ -3730,7 +3782,8 @@ public void testIsTaskCurrent() 42, // This property is different from tuningConfig null, null, - null + null, + true ); KinesisIndexTask taskFromStorage = createKinesisIndexTask( @@ -4710,7 +4763,8 @@ private TestableKinesisSupervisor getTestableSupervisor( duration, lateMessageRejectionPeriod, earlyMessageRejectionPeriod, - suspended + suspended, + true ); } @@ -4722,7 +4776,8 @@ private TestableKinesisSupervisor getTestableSupervisor( String duration, Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, - boolean suspended + boolean suspended, + boolean isEnableTimeLagMetrics ) { KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( @@ -4800,7 +4855,8 @@ public KinesisIndexTaskClient build( null, null, null, - null + null, + isEnableTimeLagMetrics ); return new TestableKinesisSupervisor( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 247ff0e00c10..994b787f40d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -849,7 +849,7 @@ private SupervisorReport partitionRecordLags = getPartitionRecordLag(); - Map partitionTimeLags = getPartitionTimeLag(); - - if (partitionRecordLags == null && partitionTimeLags == null) { - throw new ISE("Latest offsets have not been fetched"); - } final String type = spec.getType(); - - BiConsumer, String> emitFn = (partitionLags, suffix) -> { + final BiConsumer, String> emitFn = (partitionLags, suffix) -> { if (partitionLags == null) { return; } @@ -3491,9 +3484,27 @@ protected void emitLag() ); }; + final Map partitionRecordLags = getPartitionRecordLag(); + final Map partitionTimeLags; + + if (spec.getTuningConfig().isEnableTimeLagMetrics()) { + partitionTimeLags = getPartitionTimeLag(); + if (partitionRecordLags == null && partitionTimeLags == null) { + throw new ISE("Latest record and time lags have not been fetched"); + } + } else { + partitionTimeLags = null; + if (partitionRecordLags == null) { + throw new ISE("Latest record lags have not been fetched"); + } + } + // this should probably really be /count or /records or something.. but keeping like this for backwards compat emitFn.accept(partitionRecordLags, ""); - emitFn.accept(partitionTimeLags, "/time"); + + if (spec.getTuningConfig().isEnableTimeLagMetrics()) { + emitFn.accept(partitionTimeLags, "/time"); + } } catch (Exception e) { log.warn(e, "Unable to compute lag"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java index 733f1258ccbf..6f8071ca59fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java @@ -58,5 +58,8 @@ static Duration defaultDuration(final Period period, final String theDefault) @JsonProperty Duration getOffsetFetchPeriod(); + @JsonProperty + Boolean isEnableTimeLagMetrics(); + SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d34bbbbec824..847f8306665d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -872,6 +872,12 @@ public Duration getOffsetFetchPeriod() return new Period("PT5M").toStandardDuration(); } + @Override + public Boolean isEnableTimeLagMetrics() + { + return true; + } + @Override public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {