From fdc27126b761e11048e4fbb6170099a57c4c02ff Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 16 Mar 2021 14:53:29 +0800 Subject: [PATCH 1/4] fix kinesis lag metrics bug and modify current UT --- .../kinesis/KinesisRecordSupplier.java | 33 ++++++++++++++----- .../kinesis/KinesisRecordSupplierTest.java | 31 +++++++++++++++-- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index ff3b75170eaf..a38c1bb89ae9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -863,21 +863,36 @@ private Long getPartitionTimeLag(StreamPartition partition, String offse iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); offsetToUse = offset; } - String shardIterator = kinesis.getShardIterator( - partition.getStream(), - partition.getPartitionId(), - iteratorType, - offsetToUse - ).getShardIterator(); - GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) - ); + GetRecordsResult recordsResult = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition); + + // If no more new data after offsetToUse, it means there is no lag for now. + // So report lag points as 0L. + if (recordsResult.getRecords().size() == 0) { + return 0L; + } else { + recordsResult = getRecords(iteratorType, offsetToUse, partition); + } return recordsResult.getMillisBehindLatest(); }); } + private GetRecordsResult getRecords(String iteratorType, String offsetToUse, StreamPartition partition) + { + String shardIterator = kinesis.getShardIterator( + partition.getStream(), + partition.getPartitionId(), + iteratorType, + offsetToUse + ).getShardIterator(); + + GetRecordsResult recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) + ); + return recordsResult; + } + /** * Explode if {@link #close()} has been called on the supplier. */ diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 66fc52e0fbe2..9c2891fa13df 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -70,6 +70,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static final String SHARD_ID0 = "0"; private static final String SHARD1_ITERATOR = "1"; private static final String SHARD0_ITERATOR = "0"; + private static final String SHARD2_ITERATOR = "2"; private static final Long SHARD0_LAG_MILLIS = 100L; private static final Long SHARD1_LAG_MILLIS = 200L; @@ -79,6 +80,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") ); + private static final List SHARD2_RECORDS = ImmutableList.of(new Record()); private static final List SHARD1_RECORDS = ImmutableList.of( new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"), @@ -146,7 +148,9 @@ private static ByteBuffer jb(String timestamp, String dim1, String dim2, String private static DescribeStreamResult describeStreamResult1; private static GetShardIteratorResult getShardIteratorResult0; private static GetShardIteratorResult getShardIteratorResult1; + private static GetShardIteratorResult getShardIteratorResult2; private static GetRecordsResult getRecordsResult0; + private static GetRecordsResult getRecordsResult2; private static GetRecordsResult getRecordsResult1; private static StreamDescription streamDescription0; private static StreamDescription streamDescription1; @@ -162,7 +166,9 @@ public void setupTest() describeStreamResult1 = createMock(DescribeStreamResult.class); getShardIteratorResult0 = createMock(GetShardIteratorResult.class); getShardIteratorResult1 = createMock(GetShardIteratorResult.class); + getShardIteratorResult2 = createMock(GetShardIteratorResult.class); getRecordsResult0 = createMock(GetRecordsResult.class); + getRecordsResult2 = createMock(GetRecordsResult.class); getRecordsResult1 = createMock(GetRecordsResult.class); streamDescription0 = createMock(StreamDescription.class); streamDescription1 = createMock(StreamDescription.class); @@ -917,10 +923,24 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), - EasyMock.anyString(), - EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + EasyMock.eq(ShardIteratorType.TRIM_HORIZON.toString()), + EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + )).andReturn(getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") )).andReturn(getShardIteratorResult0).anyTimes(); + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") + )).andReturn(getShardIteratorResult2).anyTimes(); + EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID1), @@ -929,15 +949,20 @@ public void getPartitionTimeLag() throws InterruptedException )).andReturn(getShardIteratorResult1).anyTimes(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(getShardIteratorResult2.getShardIterator()).andReturn(SHARD2_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) .andReturn(getRecordsResult0) .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD2_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult2) + .anyTimes(); EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) .andReturn(getRecordsResult1) .anyTimes(); EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); - EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); + EasyMock.expect(getRecordsResult2.getRecords()).andReturn(SHARD2_RECORDS).once(); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).times(2); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2); From 3148c6ecfd6a92135d977a1b7f1274e882348814 Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 16 Mar 2021 15:35:49 +0800 Subject: [PATCH 2/4] done --- .idea/misc.xml | 4 +- .../kinesis/KinesisRecordSupplierTest.java | 49 +++++++++++-------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index bf2061d7392d..fe39ed623c9c 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + - + \ No newline at end of file diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 9c2891fa13df..ec8ab623ac7c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -70,17 +70,19 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static final String SHARD_ID0 = "0"; private static final String SHARD1_ITERATOR = "1"; private static final String SHARD0_ITERATOR = "0"; - private static final String SHARD2_ITERATOR = "2"; private static final Long SHARD0_LAG_MILLIS = 100L; private static final Long SHARD1_LAG_MILLIS = 200L; + private static final Long SHARD1_LAG_MILLIS_EMPTY = 0L; private static Map SHARDS_LAG_MILLIS = ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS); + private static Map SHARDS_LAG_MILLIS_EMPTY = + ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS_EMPTY); private static final List SHARD0_RECORDS = ImmutableList.of( new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") ); - private static final List SHARD2_RECORDS = ImmutableList.of(new Record()); + private static final List SHARD1_RECORDS_EMPTY = ImmutableList.of(); private static final List SHARD1_RECORDS = ImmutableList.of( new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"), @@ -148,9 +150,7 @@ private static ByteBuffer jb(String timestamp, String dim1, String dim2, String private static DescribeStreamResult describeStreamResult1; private static GetShardIteratorResult getShardIteratorResult0; private static GetShardIteratorResult getShardIteratorResult1; - private static GetShardIteratorResult getShardIteratorResult2; private static GetRecordsResult getRecordsResult0; - private static GetRecordsResult getRecordsResult2; private static GetRecordsResult getRecordsResult1; private static StreamDescription streamDescription0; private static StreamDescription streamDescription1; @@ -166,9 +166,7 @@ public void setupTest() describeStreamResult1 = createMock(DescribeStreamResult.class); getShardIteratorResult0 = createMock(GetShardIteratorResult.class); getShardIteratorResult1 = createMock(GetShardIteratorResult.class); - getShardIteratorResult2 = createMock(GetShardIteratorResult.class); getRecordsResult0 = createMock(GetRecordsResult.class); - getRecordsResult2 = createMock(GetRecordsResult.class); getRecordsResult1 = createMock(GetRecordsResult.class); streamDescription0 = createMock(StreamDescription.class); streamDescription1 = createMock(StreamDescription.class); @@ -939,34 +937,43 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.eq(SHARD_ID0), EasyMock.eq(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()), EasyMock.matches("\\d+") - )).andReturn(getShardIteratorResult2).anyTimes(); + )).andReturn(getShardIteratorResult0).anyTimes(); EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID1), - EasyMock.anyString(), - EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.TRIM_HORIZON.toString()), + EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull()) + )).andReturn(getShardIteratorResult1).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.AT_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") + )).andReturn(getShardIteratorResult1).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.eq(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString()), + EasyMock.matches("\\d+") )).andReturn(getShardIteratorResult1).anyTimes(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); - EasyMock.expect(getShardIteratorResult2.getShardIterator()).andReturn(SHARD2_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) .andReturn(getRecordsResult0) .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD2_ITERATOR, recordsPerFetch))) - .andReturn(getRecordsResult2) - .anyTimes(); EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) .andReturn(getRecordsResult1) .anyTimes(); - EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); - EasyMock.expect(getRecordsResult2.getRecords()).andReturn(SHARD2_RECORDS).once(); - EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).times(2); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).times(2); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS_EMPTY).times(2); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS_EMPTY).once(); replayAll(); @@ -1001,14 +1008,14 @@ public void getPartitionTimeLag() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); + Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, timeLag); Map offsets = ImmutableMap.of( SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() ); Map independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsets); - Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag); + Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, independentTimeLag); // Verify that kinesis apis are not called for custom sequence numbers for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER, END_OF_SHARD_MARKER, EXPIRED_MARKER)) { From eb4f993fe7c2ebe0ce0d4ede22fba1500c9c5aec Mon Sep 17 00:00:00 2001 From: yuezhang Date: Tue, 16 Mar 2021 15:36:19 +0800 Subject: [PATCH 3/4] revert misc.xml --- .idea/misc.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index fe39ed623c9c..bf2061d7392d 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -84,7 +84,7 @@ - + - \ No newline at end of file + From 6c76db674d4ff7c3d9ec514543421683a34b918a Mon Sep 17 00:00:00 2001 From: yuezhang Date: Wed, 17 Mar 2021 15:51:31 +0800 Subject: [PATCH 4/4] code review --- .../druid/indexing/kinesis/KinesisRecordSupplier.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index a38c1bb89ae9..64e3bad04869 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -864,21 +864,21 @@ private Long getPartitionTimeLag(StreamPartition partition, String offse offsetToUse = offset; } - GetRecordsResult recordsResult = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition); + GetRecordsResult recordsResult = getRecordsForLag(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition); // If no more new data after offsetToUse, it means there is no lag for now. // So report lag points as 0L. if (recordsResult.getRecords().size() == 0) { return 0L; } else { - recordsResult = getRecords(iteratorType, offsetToUse, partition); + recordsResult = getRecordsForLag(iteratorType, offsetToUse, partition); } return recordsResult.getMillisBehindLatest(); }); } - private GetRecordsResult getRecords(String iteratorType, String offsetToUse, StreamPartition partition) + private GetRecordsResult getRecordsForLag(String iteratorType, String offsetToUse, StreamPartition partition) { String shardIterator = kinesis.getShardIterator( partition.getStream(),