Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ public void onFailure(Throwable t)
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
getDataSource(),
ioConfig.getTaskGroupId(),
getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/
private static class TaskGroup
private class TaskGroup
{
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
Expand All @@ -157,6 +158,7 @@ private static class TaskGroup
final Optional<DateTime> maximumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
final String baseSequenceName;

TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
Expand All @@ -168,6 +170,7 @@ private static class TaskGroup
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
this.sequenceOffsets.put(0, partitionOffsets);
this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime);
}

int addNewCheckpoint(Map<Integer, Long> checkpoint)
Expand Down Expand Up @@ -490,25 +493,29 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
public void checkpoint(
@Nullable Integer taskGroupId,
@Deprecated String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)
{
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null");
Preconditions.checkArgument(
ioConfig.getTopic()
.equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
.getTopic()),
ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(),
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()
);

log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId);
notices.add(
new CheckpointNotice(
taskGroupId,
(KafkaDataSourceMetadata) previousCheckpoint,
(KafkaDataSourceMetadata) currentCheckpoint
baseSequenceName,
(KafkaDataSourceMetadata) previousCheckPoint,
(KafkaDataSourceMetadata) currentCheckPoint
)
);
}
Expand Down Expand Up @@ -612,30 +619,65 @@ public void handle()

private class CheckpointNotice implements Notice
{
final int taskGroupId;
final KafkaDataSourceMetadata previousCheckpoint;
final KafkaDataSourceMetadata currentCheckpoint;
@Nullable private final Integer nullableTaskGroupId;
@Deprecated private final String baseSequenceName;
private final KafkaDataSourceMetadata previousCheckpoint;
private final KafkaDataSourceMetadata currentCheckpoint;

CheckpointNotice(
int taskGroupId,
@Nullable Integer nullableTaskGroupId,
@Deprecated String baseSequenceName,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
this.taskGroupId = taskGroupId;
this.baseSequenceName = baseSequenceName;
this.nullableTaskGroupId = nullableTaskGroupId;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}

@Override
public void handle() throws ExecutionException, InterruptedException, TimeoutException
{
// Find taskGroupId using taskId if it's null. It can be null while rolling update.
final int taskGroupId;
if (nullableTaskGroupId == null) {
// We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because
// 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice
// (see checkTaskDuration()).
// 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the
// same time.
final java.util.Optional<Integer> maybeGroupId = taskGroups
.entrySet()
.stream()
.filter(entry -> {
final TaskGroup taskGroup = entry.getValue();
return taskGroup.baseSequenceName.equals(baseSequenceName);
})
.findAny()
.map(Entry::getKey);
taskGroupId = maybeGroupId.orElse(
pendingCompletionTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final List<TaskGroup> taskGroups = entry.getValue();
return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
})
.findAny()
.orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
.getKey()
);
} else {
taskGroupId = nullableTaskGroupId;
}

// check for consistency
// if already received request for this sequenceName and dataSourceMetadata combination then return

final TaskGroup taskGroup = taskGroups.get(taskGroupId);

if (isValidTaskGroup(taskGroup)) {
if (isValidTaskGroup(taskGroupId, taskGroup)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;

// check validity of previousCheckpoint
Expand All @@ -655,20 +697,13 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
final int taskGroupId = getTaskGroupIdForPartition(
currentCheckpoint.getKafkaPartitions()
.getPartitionOffsetMap()
.keySet()
.iterator()
.next()
);
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}

private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
// taskGroup might be in pendingCompletionTaskGroups or partitionGroups
Expand Down Expand Up @@ -867,17 +902,6 @@ String generateSequenceName(
return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}

@VisibleForTesting
String generateSequenceName(TaskGroup taskGroup)
{
Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
return generateSequenceName(
taskGroup.partitionOffsets,
taskGroup.minimumMessageTime,
taskGroup.maximumMessageTime
);
}

private static String getRandomId()
{
final StringBuilder suffix = new StringBuilder(8);
Expand Down Expand Up @@ -1748,15 +1772,14 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
endPartitions.put(partition, Long.MAX_VALUE);
}
TaskGroup group = taskGroups.get(groupId);
String sequenceName = generateSequenceName(group);

Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull();

KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
groupId,
sequenceName,
group.baseSequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
Expand All @@ -1777,10 +1800,10 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
.putAll(spec.getContext())
.build();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(sequenceName, getRandomId());
String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask(
taskId,
new TaskResource(sequenceName, 1),
new TaskResource(group.baseSequenceName, 1),
spec.getDataSchema(),
taskTuningConfig,
kafkaIOConfig,
Expand Down Expand Up @@ -1909,7 +1932,10 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)

String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
if (taskGroups.get(taskGroupId) != null) {
return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
return Preconditions
.checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
.baseSequenceName
.equals(taskSequenceName);
} else {
return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,8 @@ private void makeToolboxFactory() throws IOException
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
@Nullable Integer taskGroupId,
String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,7 @@ public void testCheckpointForInactiveTaskGroup()
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
);
Expand Down Expand Up @@ -2172,6 +2173,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException

supervisor.checkpoint(
0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
);
Expand All @@ -2190,13 +2192,100 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}

@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);

final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);

final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null
);

expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
expect(taskClient.getStatusAsync(anyString()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
.anyTimes();
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L));
expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
expect(taskClient.pauseAsync(anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
.anyTimes();
expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

replayAll();

supervisor.start();

supervisor.runInternal();

final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(0, 10L));
supervisor.checkpoint(
null,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0)))
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();
}

private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send(
new ProducerRecord<byte[], byte[]>(
new ProducerRecord<>(
topic,
i,
null,
Expand Down
Loading