Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
| `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false |
| `maxParseExceptions` | Integer | The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set. | no, unlimited default |
| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 |
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 |
| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) |
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 |
| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) |
| `enableTimeLagMetrics` | Boolean | If true, shard time lag metrics showing how far Druid ingestion is behind the latest data in the Kinesis stream (based on the `millisBehindLatest` value returned in the Kinesis `GetRecords` result) will be emitted, and time lag information will also be included in the supervisor status reports. | no, (default == false) |

#### IndexSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public Duration getOffsetFetchPeriod()
return offsetFetchPeriod;
}

@Override
public Boolean isEnableTimeLagMetrics()
{
// Kafka ingestion doesn't support time lag metrics
return false;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ private Runnable getRecordRunnable()

final List<byte[]> data;


if (deaggregate) {
if (deaggregateHandle == null || getDataHandle == null) {
throw new ISE("deaggregateHandle or getDataHandle is null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ protected SeekableStreamSupervisorReportPayload<String, String> createReportPayl
)
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<String, Long> partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets());
boolean effectiveIncludeOffsets = includeOffsets && spec.getTuningConfig().isEnableTimeLagMetrics();
Map<String, Long> partitionLag = null;
if (effectiveIncludeOffsets) {
partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets());
}
return new KinesisSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
Expand All @@ -282,8 +286,8 @@ protected SeekableStreamSupervisorReportPayload<String, String> createReportPayl
stateManager.getSupervisorState().getBasicState(),
stateManager.getSupervisorState(),
stateManager.getExceptionEvents(),
includeOffsets ? partitionLag : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null
effectiveIncludeOffsets ? partitionLag : null,
effectiveIncludeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null
);
}

Expand Down Expand Up @@ -323,7 +327,9 @@ protected void updateLatestSequenceFromStream(
)
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets());
if (spec.getTuningConfig().isEnableTimeLagMetrics()) {
currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets());
}
}

@Override
Expand All @@ -335,7 +341,11 @@ protected Map<String, Long> getPartitionRecordLag()
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return currentPartitionTimeLag;
if (spec.getTuningConfig().isEnableTimeLagMetrics()) {
return currentPartitionTimeLag;
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final Boolean enableTimeLagMetrics;

public static KinesisSupervisorTuningConfig defaultConfig()
{
Expand Down Expand Up @@ -75,6 +76,7 @@ public static KinesisSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -111,7 +113,8 @@ public KinesisSupervisorTuningConfig(
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("enableTimeLagMetrics") Boolean enableTimeLagMetrics
)
{
super(
Expand Down Expand Up @@ -158,6 +161,7 @@ public KinesisSupervisorTuningConfig(
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
this.enableTimeLagMetrics = enableTimeLagMetrics == null ? false : enableTimeLagMetrics;
}

@Override
Expand Down Expand Up @@ -196,6 +200,7 @@ public Duration getShutdownTimeout()
}

@Override
@JsonProperty
public Duration getRepartitionTransitionDuration()
{
return repartitionTransitionDuration;
Expand All @@ -208,6 +213,13 @@ public Duration getOffsetFetchPeriod()
return offsetFetchPeriod;
}

@Override
@JsonProperty
public Boolean isEnableTimeLagMetrics()
{
return enableTimeLagMetrics;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -241,6 +253,7 @@ public String toString()
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", enableTimeLagMetrics=" + isEnableTimeLagMetrics() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2034,13 +2034,8 @@ public void testRestoreAfterPersistingSequences() throws Exception
.andReturn(Collections.emptyList())
.anyTimes();

EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(null)
.anyTimes();

replayAll();


final KinesisIndexTask task1 = createTask(
"task1",
new KinesisIndexTaskIOConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ public void testConvert()
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void setupTest()
null,
null,
null,
null
null,
true
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new ExceptionCapturingServiceEmitter();
Expand Down Expand Up @@ -281,6 +282,23 @@ public void testNoInitialState() throws Exception
KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER,
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
);

SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
Assert.assertEquals(DATASOURCE, report.getId());

KinesisSupervisorReportPayload payload = report.getPayload();
Assert.assertEquals(DATASOURCE, payload.getDataSource());
Assert.assertEquals(3600L, payload.getDurationSeconds());
Assert.assertEquals(2, payload.getPartitions());
Assert.assertEquals(1, payload.getReplicas());
Assert.assertEquals(STREAM, payload.getStream());
Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(0, payload.getPublishingTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());

Assert.assertEquals(TIME_LAG, payload.getMinimumLagMillis());
Assert.assertEquals(10234L, (long) payload.getAggregateLagMillis());
}

@Test
Expand Down Expand Up @@ -1657,13 +1675,40 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()

@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
testDiscoverExistingPublishingAndReadingTaskBase(true);
}

@Test
public void testDiscoverExistingPublishingAndReadingTaskWithoutTimeLagMetrics() throws Exception
{
testDiscoverExistingPublishingAndReadingTaskBase(false);
}

/**
* This test helper covers the three areas where time lag metrics are handled:
* - Emission of the metric
* - Inclusion of time lag in supervisor aggregate report
* - Inclusion of time lag in active task report
*/
private void testDiscoverExistingPublishingAndReadingTaskBase(boolean enableTimeLagMetrics) throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L);
final Map<String, Long> timeLag = enableTimeLagMetrics ? ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L) : null;

supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisor = getTestableSupervisor(
1,
1,
true,
false,
"PT1H",
null,
null,
false,
enableTimeLagMetrics
);

supervisorRecordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
Expand All @@ -1680,9 +1725,13 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();

if (enableTimeLagMetrics) {
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
}

Task id1 = createKinesisIndexTask(
"id1",
DATASOURCE,
Expand Down Expand Up @@ -1808,6 +1857,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
TaskReportData activeReport = payload.getActiveTasks().get(0);
TaskReportData publishingReport = payload.getPublishingTasks().get(0);

Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
Assert.assertEquals(enableTimeLagMetrics ? 300L : null, payload.getAggregateLagMillis());

Assert.assertEquals("id2", activeReport.getId());
Assert.assertEquals(startTime, activeReport.getStartTime());
Assert.assertEquals(ImmutableMap.of(
Expand Down Expand Up @@ -2511,7 +2563,7 @@ public void testResetNoDataSourceMetadata()
@Test
public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false);
supervisor = getTestableSupervisor(1, 1, true, true, "PT1H", null, null, false, true);

supervisorRecordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
Expand Down Expand Up @@ -3730,7 +3782,8 @@ public void testIsTaskCurrent()
42, // This property is different from tuningConfig
null,
null,
null
null,
true
);

KinesisIndexTask taskFromStorage = createKinesisIndexTask(
Expand Down Expand Up @@ -4710,7 +4763,8 @@ private TestableKinesisSupervisor getTestableSupervisor(
duration,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended
suspended,
true
);
}

Expand All @@ -4722,7 +4776,8 @@ private TestableKinesisSupervisor getTestableSupervisor(
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended
boolean suspended,
boolean isEnableTimeLagMetrics
)
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
Expand Down Expand Up @@ -4800,7 +4855,8 @@ public KinesisIndexTaskClient build(
null,
null,
null,
null
null,
isEnableTimeLagMetrics
);

return new TestableKinesisSupervisor(
Expand Down
Loading