Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public IncrementalPublishingKafkaIndexTaskRunner(
}

@Override
protected Long getSequenceNumberToStoreAfterRead(@NotNull Long sequenceNumber)
protected Long getNextStartOffset(@NotNull Long sequenceNumber)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:+1 on switching to 'offset', i think it's more intuitive terminology, though maybe change parameter variable name too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to revert this for now, but plan to try again later.

{
return sequenceNumber + 1;
}
Expand Down Expand Up @@ -209,17 +209,11 @@ protected void possiblyResetDataSourceMetadata(
}

@Override
protected boolean isEndSequenceOffsetsExclusive()
protected boolean isEndOffsetExclusive()
{
return true;
}

@Override
protected boolean isStartingSequenceOffsetsExclusive()
{
return false;
}

@Override
protected boolean isEndOfShard(Long seqNum)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,18 +706,11 @@ protected void possiblyResetDataSourceMetadata(
}

@Override
protected boolean isEndSequenceOffsetsExclusive()
protected boolean isEndOffsetExclusive()
{
return false;
return true;
}

@Override
protected boolean isStartingSequenceOffsetsExclusive()
{
return false;
}


@Override
protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Expand Down Expand Up @@ -805,7 +798,7 @@ private void requestPause()
}

@Override
protected Long getSequenceNumberToStoreAfterRead(Long sequenceNumber)
protected Long getNextStartOffset(Long sequenceNumber)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String


@Override
protected String getSequenceNumberToStoreAfterRead(String sequenceNumber)
protected String getNextStartOffset(String sequenceNumber)
{
return sequenceNumber;
}
Expand Down Expand Up @@ -160,17 +160,11 @@ protected void possiblyResetDataSourceMetadata(
}

@Override
protected boolean isEndSequenceOffsetsExclusive()
protected boolean isEndOffsetExclusive()
{
return false;
}

@Override
protected boolean isStartingSequenceOffsetsExclusive()
{
return true;
}

@Override
protected boolean isEndOfShard(String seqNum)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
Expand Down Expand Up @@ -1079,7 +1080,7 @@ public void testRunWithTransformSpec() throws Exception


@Test(timeout = 120_000L)
public void testRunOnNothing() throws Exception
public void testRunOnSingletonRange() throws Exception
{
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();
Expand All @@ -1089,11 +1090,15 @@ public void testRunOnNothing() throws Exception
recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 3)).once();

recordSupplier.close();
expectLastCall().once();

replayAll();

// When start and end offsets are the same, it means we need to read one message (since in Kinesis, end offsets
// are inclusive).
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
Expand Down Expand Up @@ -1128,12 +1133,12 @@ public void testRunOnNothing() throws Exception
verifyAll();

// Check metrics
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());

// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
Assert.assertEquals(ImmutableSet.of(sd(task, "2010/P1D", 0)), publishedDescriptors());
}


Expand Down Expand Up @@ -2099,14 +2104,11 @@ public void testRunTwoTasksTwoPartitions() throws Exception
@Test(timeout = 120_000L)
public void testRestore() throws Exception
{
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

final StreamPartition<String> streamPartition = StreamPartition.of(stream, shardId1);
recordSupplier.assign(ImmutableSet.of(streamPartition));
expectLastCall();
recordSupplier.seek(streamPartition, "2");
expectLastCall();
expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 4))
.once()
.andReturn(Collections.emptyList())
Expand Down Expand Up @@ -2157,16 +2159,13 @@ public void testRestore() throws Exception
verifyAll();
reset(recordSupplier);

recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

recordSupplier.assign(ImmutableSet.of(streamPartition));
expectLastCall();
recordSupplier.seek(streamPartition, "3");
expectLastCall();
expect(recordSupplier.poll(anyLong())).andReturn(records.subList(3, 6)).once();

recordSupplier.assign(ImmutableSet.of());
expectLastCall();
recordSupplier.close();
expectLastCall();

Expand Down Expand Up @@ -2248,8 +2247,6 @@ public void testRestoreAfterPersistingSequences() throws Exception
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

Expand Down Expand Up @@ -2321,9 +2318,6 @@ public void testRestoreAfterPersistingSequences() throws Exception
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();


recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

Expand Down Expand Up @@ -2377,7 +2371,7 @@ public void testRestoreAfterPersistingSequences() throws Exception
Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());

Expand All @@ -2386,8 +2380,9 @@ public void testRestoreAfterPersistingSequences() throws Exception
SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0);
SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0);
SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0);
SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors());
SegmentDescriptor desc5 = sd(task1, "2012/P1D", 0);
SegmentDescriptor desc6 = sd(task1, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
Expand All @@ -2401,14 +2396,11 @@ public void testRestoreAfterPersistingSequences() throws Exception
@Test(timeout = 120_000L)
public void testRunWithPauseAndResume() throws Exception
{
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

final StreamPartition<String> streamPartition = StreamPartition.of(stream, shardId1);
recordSupplier.assign(ImmutableSet.of(streamPartition));
expectLastCall();
recordSupplier.seek(streamPartition, "2");
expectLastCall();
expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 5))
.once()
.andReturn(Collections.emptyList())
Expand Down Expand Up @@ -2475,14 +2467,8 @@ public void testRunWithPauseAndResume() throws Exception

reset(recordSupplier);

recordSupplier.assign(anyObject());
expectLastCall().anyTimes();

expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();

recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();

recordSupplier.assign(ImmutableSet.of());
expectLastCall();
recordSupplier.close();
expectLastCall().once();

Expand Down Expand Up @@ -2546,8 +2532,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception

final TreeMap<Integer, Map<String, String>> sequences = new TreeMap<>();
// Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
// and this task should start reading from stream 2 for partition 0
sequences.put(1, ImmutableMap.of(shardId1, "2"));
// and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
sequences.put(1, ImmutableMap.of(shardId1, "1"));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<String, String>>>()
{
Expand Down Expand Up @@ -2784,7 +2770,7 @@ private ListenableFuture<TaskStatus> runTask(final Task task)
throw new ISE("Task is not ready");
}
}
catch (Exception e) {
catch (Throwable e) {
log.warn(e, "Task failed");
return TaskStatus.failure(task.getId(), Throwables.getStackTraceAsString(e));
}
Expand Down
Loading