From e437f04daf3b172548ff310930b7f7b0cc8bc9cd Mon Sep 17 00:00:00 2001 From: HeesungSohn Date: Tue, 26 Apr 2022 09:22:46 -0700 Subject: [PATCH] [PIP-156][fix][ci] fixed KinesisSinkTest record format error by deaggregation --- .../io/sinks/KinesisSinkTester.java | 71 ++++++++++++------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java index fbed1fa5b42d6..3907458aeca22 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectReader; import java.util.LinkedHashMap; +import java.util.ArrayList; +import java.util.List; import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.Data; @@ -46,7 +48,10 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; +import java.io.UncheckedIOException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map kvs) { Map actualKvs = new LinkedHashMap<>(); - // millisBehindLatest equals zero when record processing is caught up, - // and there are no new records to process at this moment. - // See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest - Awaitility.await().until(() -> addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0); + addMoreRecords(actualKvs, iterator); assertEquals(actualKvs, kvs); } @SneakyThrows - private Long addMoreRecordsAndGetMillisBehindLatest(Map kvs, String iterator) { - final GetRecordsResponse response = client.getRecords( - GetRecordsRequest - .builder() - .shardIterator(iterator) - .build()) - .get(); - if(response.hasRecords()) { - for (Record record : response.records()) { - String data = record.data().asString(StandardCharsets.UTF_8); - if (withSchema) { - JsonNode payload = READER.readTree(data).at("/payload"); - String i = payload.at("/value/field1").asText(); - assertEquals(payload.at("/value/field2").asText(), "v2_" + i); - assertEquals(payload.at("/key/field1").asText(), "f1_" + i); - assertEquals(payload.at("/key/field2").asText(), "f2_" + i); - kvs.put(i, i); - } else { - kvs.put(record.partitionKey(), data); + private void parseRecordData(Map actualKvs, String data, String partitionKey) { + if (withSchema) { + JsonNode payload = READER.readTree(data).at("/payload"); + String i = payload.at("/value/field1").asText(); + assertEquals(payload.at("/value/field2").asText(), "v2_" + i); + assertEquals(payload.at("/key/field1").asText(), "f1_" + i); + assertEquals(payload.at("/key/field2").asText(), "f2_" + i); + actualKvs.put(i, i); + } else { + actualKvs.put(partitionKey, data); + } + } + + @SneakyThrows + private void addMoreRecords(Map actualKvs, String iterator) { + GetRecordsResponse response; + List aggRecords = new ArrayList<>(); + do { + GetRecordsRequest request = GetRecordsRequest.builder().shardIterator(iterator).build(); + response = client.getRecords(request).get(); + if (response.hasRecords()) { + for (Record record : response.records()) { + // KinesisSink uses KPL with aggregation enabled (by default). + // However, due to the async state initialization of the KPL internal ShardMap, + // the first sinked records might not be aggregated in Kinesis. + // ref: https://github.com/awslabs/amazon-kinesis-producer/issues/131 + try { + String data = record.data().asString(StandardCharsets.UTF_8); + parseRecordData(actualKvs, data, record.partitionKey()); + } catch (UncheckedIOException e) { + aggRecords.add(KinesisClientRecord.fromRecord(record)); + } } } + iterator = response.nextShardIterator(); + // millisBehindLatest equals zero when record processing is caught up, + // and there are no new records to process at this moment. + // See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest + } while (response.millisBehindLatest() != 0); + + for (KinesisClientRecord record : new AggregatorUtil().deaggregate(aggRecords)) { + String data = new String(record.data().array(), StandardCharsets.UTF_8); + parseRecordData(actualKvs, data, record.partitionKey()); } - return response.millisBehindLatest(); } @Data