Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|-----|----|-----------|--------|
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows.|no (default == 5000000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
Expand All @@ -130,6 +130,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)|
|`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)|

#### IndexSpec

Expand Down Expand Up @@ -312,12 +313,12 @@ In this way, configuration changes can be applied without requiring any pause in
### On the Subject of Segments

Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment limit is reached, at this point a new partition for this segment granularity is
created for further events. Kafka Indexing Task also does incremental hand-offs which means that all the segments created by a
task will not be held up till the task duration is over. As soon as maxRowsPerSegment limit is hit, all the segments held
by the task at that point in time will be handed-off and new set of segments will be created for further events.
This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager
nodes and it is encouraged to do so.
granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition
for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment
or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
and new set of segments will be created for further events. This means that the task can run for longer durations of time
without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so.

Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity
is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public enum Status

private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> maxEndOffsets = new HashMap<>();
private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();

private TaskToolbox toolbox;
Expand Down Expand Up @@ -231,6 +230,7 @@ public enum Status

private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0;
private volatile long nextCheckpointTime;

// This value can be tuned in some tests
private long pollRetryMs = 30000;
Expand Down Expand Up @@ -273,12 +273,6 @@ public KafkaIndexTask(
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.authorizerMapper = authorizerMapper;
this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
this.maxEndOffsets.putAll(endOffsets.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
integerLongEntry -> Long.MAX_VALUE
)));
this.topic = ioConfig.getStartPartitions().getTopic();
this.sequences = new CopyOnWriteArrayList<>();

Expand All @@ -288,6 +282,12 @@ public KafkaIndexTask(
} else {
useLegacy = true;
}
resetNextCheckpointTime();
}

private void resetNextCheckpointTime()
{
nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
}

@VisibleForTesting
Expand Down Expand Up @@ -444,15 +444,15 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
maxEndOffsets,
endOffsets,
false
));
} else {
sequences.add(new SequenceMetadata(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionOffsetMap(),
maxEndOffsets,
endOffsets,
false
));
}
Expand Down Expand Up @@ -775,7 +775,11 @@ public void onFailure(Throwable t)
}
}

if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) {
if (System.currentTimeMillis() > nextCheckpointTime) {
sequenceToCheckpoint = sequences.get(sequences.size() - 1);
}

if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
sequences.get(sequences.size() - 1)
.getSequenceName()
Expand Down Expand Up @@ -1547,6 +1551,7 @@ public Response setEndOffsets(
}
}

resetNextCheckpointTime();
latestSequence.setEndOffsets(offsets);

if (finish) {
Expand All @@ -1559,7 +1564,7 @@ public Response setEndOffsets(
latestSequence.getSequenceId() + 1,
StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
offsets,
maxEndOffsets,
endOffsets,
false
);
sequences.add(newSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final boolean resetOffsetAutomatically;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
private final Period intermediateHandoffPeriod;

@JsonCreator
public KafkaTuningConfig(
Expand All @@ -63,7 +64,8 @@ public KafkaTuningConfig(
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
Expand All @@ -87,6 +89,9 @@ public KafkaTuningConfig(
? DEFAULT_RESET_OFFSET_AUTOMATICALLY
: resetOffsetAutomatically;
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.intermediateHandoffPeriod = intermediateHandoffPeriod == null
? new Period().withDays(Integer.MAX_VALUE)
: intermediateHandoffPeriod;
}

public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
Expand All @@ -102,7 +107,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
config.reportParseExceptions,
config.handoffConditionTimeout,
config.resetOffsetAutomatically,
config.segmentWriteOutMediumFactory
config.segmentWriteOutMediumFactory,
config.intermediateHandoffPeriod
);
}

Expand Down Expand Up @@ -185,6 +191,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
return segmentWriteOutMediumFactory;
}

@JsonProperty
public Period getIntermediateHandoffPeriod()
{
return intermediateHandoffPeriod;
}

public KafkaTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaTuningConfig(
Expand All @@ -198,7 +210,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);
}

Expand All @@ -221,7 +234,8 @@ public boolean equals(Object o)
Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory);
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod);
}

@Override
Expand All @@ -237,7 +251,8 @@ public int hashCode()
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);
}

Expand All @@ -255,6 +270,7 @@ public String toString()
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
", intermediateHandoffPeriod=" + intermediateHandoffPeriod +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public KafkaSupervisorSpec(
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod
)
{
super(
Expand All @@ -70,7 +71,8 @@ public KafkaSupervisorTuningConfig(
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
intermediateHandoffPeriod
);

this.workerThreads = workerThreads;
Expand Down Expand Up @@ -137,6 +139,7 @@ public String toString()
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
", offsetFetchPeriod=" + offsetFetchPeriod +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public class KafkaIndexTaskTest
private boolean resetOffsetAutomatically = false;
private boolean doHandoff = true;
private Integer maxRowsPerSegment = null;
private Period intermediateHandoffPeriod = null;

private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
Expand Down Expand Up @@ -524,6 +525,84 @@ public void testIncrementalHandOff() throws Exception
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}

@Test(timeout = 60_000L)
public void testTimeBasedIncrementalHandOff() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
// as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
maxRowsPerSegment = Integer.MAX_VALUE;
intermediateHandoffPeriod = new Period().withSeconds(0);

// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records.subList(0, 2)) {
kafkaProducer.send(record).get();
}
}
Map<String, String> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");

final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L));
// Checkpointing will happen at checkpoint
final KafkaPartitions checkpoint = new KafkaPartitions(topic, ImmutableMap.of(0, 1L, 1, 0L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L));
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
true,
false,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);

// task will pause for checkpointing
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets));
task.setEndOffsets(currentOffsets, true, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

Assert.assertEquals(1, checkpointRequestsHash.size());
Assert.assertTrue(checkpointRequestsHash.contains(
Objects.hash(
DATA_SCHEMA.getDataSource(),
baseSequenceName,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap()))
)
));

// Check metrics
Assert.assertEquals(2, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());

// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2009/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2));
}

@Test(timeout = 60_000L)
public void testRunWithMinimumMessageTime() throws Exception
{
Expand Down Expand Up @@ -1708,7 +1787,8 @@ private KafkaIndexTask createTask(
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null
null,
intermediateHandoffPeriod
);
final Map<String, Object> context = isIncrementalHandoffSupported
? ImmutableMap.of(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
Expand Down Expand Up @@ -1746,6 +1826,7 @@ private KafkaIndexTask createTask(
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null,
null
);
if (isIncrementalHandoffSupported) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void testCopyOf() throws Exception
true,
5L,
null,
null,
null
);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void setupTest() throws Exception
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null
);

Expand Down