diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index ff3b75170eaf..64e3bad04869 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -863,21 +863,36 @@ private Long getPartitionTimeLag(StreamPartition partition, String offse iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); offsetToUse = offset; } - String shardIterator = kinesis.getShardIterator( - partition.getStream(), - partition.getPartitionId(), - iteratorType, - offsetToUse - ).getShardIterator(); - GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) - ); + GetRecordsResult recordsResult = getRecordsForLag(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition); + + // If no more new data after offsetToUse, it means there is no lag for now. + // So report lag points as 0L. + if (recordsResult.getRecords().size() == 0) { + return 0L; + } else { + recordsResult = getRecordsForLag(iteratorType, offsetToUse, partition); + } return recordsResult.getMillisBehindLatest(); }); } + private GetRecordsResult getRecordsForLag(String iteratorType, String offsetToUse, StreamPartition partition) + { + String shardIterator = kinesis.getShardIterator( + partition.getStream(), + partition.getPartitionId(), + iteratorType, + offsetToUse + ).getShardIterator(); + + GetRecordsResult recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) + ); + return recordsResult; + } + /** * Explode if {@link #close()} has been called on the supplier. */ diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 66fc52e0fbe2..ec8ab623ac7c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -73,12 +73,16 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static final Long SHARD0_LAG_MILLIS = 100L; private static final Long SHARD1_LAG_MILLIS = 200L; + private static final Long SHARD1_LAG_MILLIS_EMPTY = 0L; private static Map SHARDS_LAG_MILLIS = ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS); + private static Map SHARDS_LAG_MILLIS_EMPTY = + ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS_EMPTY); private static final List SHARD0_RECORDS = ImmutableList.of( new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") ); + private static final List SHARD1_RECORDS_EMPTY = ImmutableList.of(); private static final List SHARD1_RECORDS = ImmutableList.of( new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"), @@ -917,15 +921,43 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), - EasyMock.anyString(), - EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + EasyMock.eq(ShardIteratorType.TRIM_HORIZON.toString()), + EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) )).andReturn(getShardIteratorResult0).anyTimes(); EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID1), - EasyMock.anyString(), - EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") + )).andReturn(getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") + )).andReturn(getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.TRIM_HORIZON.toString()), + EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + )).andReturn(getShardIteratorResult1).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") + )).andReturn(getShardIteratorResult1).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") )).andReturn(getShardIteratorResult1).anyTimes(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); @@ -936,12 +968,12 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) .andReturn(getRecordsResult1) .anyTimes(); - EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); - EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).times(2); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS_EMPTY).times(2); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS_EMPTY).once(); replayAll(); @@ -976,14 +1008,14 @@ public void getPartitionTimeLag() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); + Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, timeLag); Map offsets = ImmutableMap.of( SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() ); Map independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsets); - Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag); + Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, independentTimeLag); // Verify that kinesis apis are not called for custom sequence numbers for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER, END_OF_SHARD_MARKER, EXPIRED_MARKER)) {