Describe the bug
Lags for kinesis keep increasing when there is no more new data for kinesis.
Expected Behavior
Lags is zero if there is no more new data.
Current Behavior
keep increasing
Steps to Reproduce
- produce data to kinesis
- supervisor kinesis.json to overlord.
- stop kinesis producer
- Lag keep increasing even though there is no more new data for kinesis.
I think this may be caused by misunderstanding of API recordsResult.getMillisBehindLatest()
Here is document about this api
MillisBehindLatest
The number of milliseconds the GetRecords response is from the tip of the stream, indicating how far behind current time the consumer is. A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.
Type: Long
Valid Range: Minimum value of 0.
I think it means MillisBehindLatest = consumeTime - dataTime
Here is my test Code
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
public class KinesisGetTest {
public static void main(String[] args) {
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
.withRegion(Regions.US_EAST_1)
.build();
String streamName ="dummyStream";
getStockTrades(kinesis,streamName);
getLatest(kinesis,streamName);
}
public static void getStockTrades(AmazonKinesis kinesis, String streamName) {
String shardIterator = kinesis.getShardIterator(
streamName,
"shardId-000000000001",
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
"49609775258711157847444017307218867996992776863817400338"
).getShardIterator();
GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
);
System.out.println("Current : " + recordsResult);
System.out.println("Current : " + recordsResult.getRecords().size());
System.out.println(recordsResult.getMillisBehindLatest());
}
public static void getLatest(AmazonKinesis kinesis, String streamName) {
String shardIterator = kinesis.getShardIterator(
streamName,
"shardId-000000000001",
ShardIteratorType.LATEST.toString()
).getShardIterator();
GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
);
System.out.println("Latest: " + recordsResult);
System.out.println("Latest: " + recordsResult.getRecords().size());
System.out.println(recordsResult.getMillisBehindLatest());
}
}
Here are the outputs twice.
Current : {Records: [{SequenceNumber: 49609775258711157847444017307218867996992776863817400338,ApproximateArrivalTimestamp: Fri Mar 12 10:33:54 CST 2021,Data: xxx,PartitionKey: xxxx#7579871697745#WZqoyxnp,}],NextShardIterator: AAAAAAAAAAEfEfaE4kbzGrymjJ6+0tfNi0xS791lzLKKtdMuRjZeTELzflQ+aIvS4bBscPf4hH4FiyHOBu1O61YfBlu/BkFM07us9WfHqro7dwvJU5wkMIQ/gV4msye/dBia2Rp//fRqgLpIuYhkKxCoR26t212sMJOgoObxSWfw+bHXpaBnxu5mV6O6buYrEyEGjeyI4cPZuSUOTZZ/8/kazJcVhzVF5J+q153ScagBJNK34PalhNQfsVQou+H5esxieJqz61fil7Q7+cXUlTDCM7x77XkI,MillisBehindLatest: 12185000}
Current : 1
12185000
Latest: {Records: [],NextShardIterator: AAAAAAAAAAHmHyT2SkiJsKfdoi4FTop/Iuap5WVN3ihGGTivPOQUzS84Q6HOHZvGt0Yov136H+7o3UVkG2uzAY9sLdApsV5Q26Ahi+fNEN1DKihCZ90rSoOAbjXLykx1bAkrNjmJCzFU5XYVFKh9SHmn//UBWSl55Q7hbcdvPSLbfQRaQBumsDgXZY5F2xF5QZ6YCDoqUt01liOxYTRCFC33V6XXjmZWU69NTaUJ7Ku92BQGbeWVvOIP6Ae6xHGYNA/01c34dXJchY8EUxozdYUPbDw+Hvfb,MillisBehindLatest: 0}
Latest: 0
0
AND!
Current : {Records: [{SequenceNumber: 49609775258711157847444017307218867996992776863817400338,ApproximateArrivalTimestamp: Fri Mar 12 10:33:54 CST 2021,Data: xxx,PartitionKey: xxx#7579871697745#WZqoyxnp,}],NextShardIterator: AAAAAAAAAAG8yLLTlrY/zb2pHM7XMppR4M55aXnaJbppstPqGwgxJIeT1hDJAhCm5y1Zq9cMe+1ZMjPu5Cygixs+Ek9RcAkW4oRoE3ja2AGXSRi4pNNbLrDYQoOWOahNvZmKm3B1NzMYu6vwJgiosp6BUQKeby6bzZxGYD1qrLbXLVPD+SvzVsRTxWFBXDXhTjdYwu8iflRNJigZUYkXi1g4YlPJcCly8yRWSMiY9irpK6mDXPc4MzJusdWDJshueRG7lLZjHcRQb66H+lNcWH62onvPd6h+,MillisBehindLatest: 12252000}
Current : 1
12252000
Latest: {Records: [],NextShardIterator: AAAAAAAAAAGFg2h3aQIOXX3rOHct2oQMILcvyqDchVTR3P2JH9kUi5keWhjwavVLzz80Biz13k/b56spezqmx9ZzYZ323FqMZUvCO/LV6P8FAi1eQC8KzYJZS0CPdEn0mZNyZ2jxevtvLRwXaMpUgbBvIG+G1U0w8FctuYogze/O6mN9cEg9eAV4lsGy40jE7i8k4T1S7k2Qzw5YA/gtAwgJmmMMUypJdyJ7BoYQ+fhG8KfVvJYrvAaf4bXBzbtOmrYy0HSKlndwTxJM5aisN/ZQh7UvcDpf,MillisBehindLatest: 0}
Latest: 0
0
As you can see, Latest is empty but Current using the same offset is still increasing.
Describe the bug
Lags for kinesis keep increasing when there is no more new data for kinesis.
Expected Behavior
Lags is zero if there is no more new data.
Current Behavior
keep increasing
Steps to Reproduce
I think this may be caused by misunderstanding of API
recordsResult.getMillisBehindLatest()Here is document about this api
I think it means
MillisBehindLatest = consumeTime - dataTimeHere is my test Code
Here are the outputs twice.
AND!
As you can see, Latest is empty but Current using the same offset is still increasing.