Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ public Map<Integer, Long> getCurrentOffsets(@Context final HttpServletRequest re
}

@Override
public Map<Integer, Long> getCurrentOffsets()
public ConcurrentMap<Integer, Long> getCurrentOffsets()
{
return nextOffsets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ public class KafkaIndexTaskTest
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;

private int handoffCount = 0;

// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
Expand Down Expand Up @@ -877,7 +875,14 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0"))
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
);

final String baseSequenceName = "sequence0";
Expand All @@ -904,9 +909,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
topic,
ImmutableMap.of(0, 5L)
);
final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 12L)
);
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 7L)
ImmutableMap.of(0, Long.MAX_VALUE)
);

final KafkaIndexTask task = createTask(
Expand All @@ -927,17 +936,28 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);

// Simulating the case when another replica has consumed up to the offset of 8
task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);

// actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
// will continue reading through the end offset of the checkpointed sequence
task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);

task.getRunner().setEndOffsets(
ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
true
);

Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

// processed count would be 5 if it stopped at it's current offsets
Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
// processed count would be 8 if it stopped at it's current offsets
Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -639,7 +640,6 @@ public void testIncrementalHandOff() throws Exception
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}


@Test(timeout = 120_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
Expand Down Expand Up @@ -2277,7 +2277,7 @@ public void testRunWithPauseAndResume() throws Exception

verifyAll();

Map<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
ConcurrentMap<String, String> currentOffsets = task.getRunner().getCurrentOffsets();

try {
future.get(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2423,6 +2423,154 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}

@Test(timeout = 5000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
);

final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;

recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
.once()
.andReturn(records.subList(4, 10))
.once()
.andReturn(records.subList(9, 15))
.once();

recordSupplier.close();
expectLastCall().once();

replayAll();

final SeekableStreamPartitions<String, String> startPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"0"
)
);

final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"4"
)
);

final SeekableStreamPartitions<String, String> checkpoint2 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"9"
)
);

final SeekableStreamPartitions<String, String> endPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"14"
)
);
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
null,
baseSequenceName,
startPartitions,
endPartitions,
true,
null,
null,
"awsEndpoint",
null,
null,
null,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
Thread.sleep(10);
}
Map<String, String> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(currentOffsets, false);

// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);

task.getRunner().setEndOffsets(
ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)),
true
);

Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());

verifyAll();

Assert.assertEquals(2, checkpointRequestsHash.size());

// Check metrics
Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());

// Check published metadata
final Set<SegmentDescriptor> descriptors = new HashSet<>();
descriptors.add(sd(task, "2008/P1D", 0));
descriptors.add(sd(task, "2008/P1D", 1));
descriptors.add(sd(task, "2009/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 1));
descriptors.add(sd(task, "2011/P1D", 0));
descriptors.add(sd(task, "2011/P1D", 1));
descriptors.add(sd(task, "2012/P1D", 0));
descriptors.add(sd(task, "2013/P1D", 0));
Assert.assertEquals(descriptors, publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"10"
))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
}

private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import java.util.Map;


public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable>
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
extends AbstractTask implements ChatHandler
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
Expand Down
Loading