Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.joda.time.Duration;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -278,11 +277,7 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}

@Override
public void checkpoint(
@Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata checkpointMetadata
)
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
// do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2705,8 +2705,7 @@ private void makeToolboxFactory() throws IOException
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2378,7 +2378,6 @@ public void testCheckpointForInactiveTaskGroup()
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
Expand Down Expand Up @@ -2463,7 +2462,6 @@ public void testCheckpointForUnknownTaskGroup()

supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())
)
Expand All @@ -2489,104 +2487,6 @@ public void testCheckpointForUnknownTaskGroup()
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}

@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException
{
supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
supervisor.getStateManager().markRunFinished();

//not adding any events
final KafkaIndexTask id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);

final Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);

final Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
null,
null,
tuningConfig
);

EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L));
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
.anyTimes();
EasyMock.expect(taskClient.setEndOffsetsAsync(
EasyMock.anyString(),
EasyMock.eq(ImmutableMap.of(0, 10L)),
EasyMock.anyBoolean()
))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

replayAll();

supervisor.start();

supervisor.runInternal();

supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();
}

@Test
public void testSuspendedNoRunningTasks() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2912,8 +2912,7 @@ private void makeToolboxFactory() throws IOException
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
@Nullable DataSourceMetadata checkpointMetadata
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2971,7 +2971,6 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), checkpoints.get(0).keySet())
)
Expand Down Expand Up @@ -3098,7 +3097,6 @@ public void testCheckpointForUnknownTaskGroup()

supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.emptyMap(), ImmutableSet.of())
)
Expand All @@ -3123,111 +3121,6 @@ public void testCheckpointForUnknownTaskGroup()
Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
}

@Test(timeout = 60_000L)
public void testCheckpointWithNullTaskGroupId() throws InterruptedException
{
supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
supervisor.getStateManager().markRunFinished();

//not adding any events
final KinesisIndexTask id1 = createKinesisIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);

final Task id2 = createKinesisIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);

final Task id3 = createKinesisIndexTask(
"id3",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
null,
null
);

EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata(
null)
).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(SHARD_ID1, "0"));
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.times(3);
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(SHARD_ID1, "10")))
.anyTimes();
EasyMock.expect(taskClient.setEndOffsetsAsync(
EasyMock.anyString(),
EasyMock.eq(ImmutableMap.of(SHARD_ID1, "10")),
EasyMock.anyBoolean()
))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

replayAll();

supervisor.start();

supervisor.runInternal();

supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();
}


@Test
public void testSuspendedNoRunningTasks() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,18 @@
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
@Nullable
private final Integer taskGroupId;
@Deprecated
private final String baseSequenceName;
private final int taskGroupId;
private final SeekableStreamDataSourceMetadata checkpointMetadata;

public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
@JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("taskGroupId") Integer taskGroupId,
@JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint,
@JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata
)
{
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata;

Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata");
Expand All @@ -65,13 +60,6 @@ public String getSupervisorId()
return supervisorId;
}

@Deprecated
@JsonProperty("sequenceName")
public String getBaseSequenceName()
{
return baseSequenceName;
}

@Nullable
@JsonProperty
public Integer getTaskGroupId()
Expand Down Expand Up @@ -107,7 +95,6 @@ public Boolean perform(Task task, TaskActionToolbox toolbox)
return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
supervisorId,
taskGroupId,
baseSequenceName,
checkpointMetadata
);
}
Expand All @@ -123,7 +110,6 @@ public String toString()
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' +
", checkpointMetadata=" + checkpointMetadata +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc

public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
int taskGroupId,
DataSourceMetadata previousDataSourceMetadata
)
{
Expand All @@ -205,7 +204,7 @@ public boolean checkPointDataSourceMetadata(

Preconditions.checkNotNull(supervisor, "supervisor could not be found");

supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata);
supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata);
return true;
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ public void onFailure(Throwable t)
final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
task.getDataSource(),
ioConfig.getTaskGroupId(),
task.getIOConfig().getBaseSequenceName(),
null,
createDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
Expand Down
Loading