-
Notifications
You must be signed in to change notification settings - Fork 142
[BUG] Pulsar consumer may skip messages #463
Description
Describe the bug
Recently there're some flaky tests that are hard to pass after the depended Pulsar were upgraded from daily release version 202102252222 to 202104202206, see #448 and https://github.com/streamnative/kop/runs/2408190437
To Reproduce
It could even be reproduced locally. For example, when I run
mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl testsThere're 5 failed tests:
[ERROR] Failures:
[ERROR] io.streamnative.pulsar.handlers.kop.KafkaMessageOrderPulsarTest.testKafkaProduceMessageOrder(io.streamnative.pulsar.handlers.kop.KafkaMessageOrderPulsarTest)
[ERROR] Run 1: KafkaMessageOrderPulsarTest>KafkaMessageOrderTestBase.testKafkaProduceMessageOrder:161 expected [18] but found [17]
[ERROR] Run 2: KafkaMessageOrderPulsarTest>KafkaMessageOrderTestBase.testKafkaProduceMessageOrder:161 expected [9] but found [8]
[ERROR] Run 3: KafkaMessageOrderPulsarTest>KafkaMessageOrderTestBase.testKafkaProduceMessageOrder:161 expected [71] but found [70]
[ERROR] Run 4: KafkaMessageOrderPulsarTest>KafkaMessageOrderTestBase.testKafkaProduceMessageOrder:161 expected [14] but found [13]
[INFO]
[ERROR] io.streamnative.pulsar.handlers.kop.MultiLedgerTest.testProduceConsumeMultiLedger(io.streamnative.pulsar.handlers.kop.MultiLedgerTest)
[ERROR] Run 1: MultiLedgerTest.testProduceConsumeMultiLedger:172 expected [22] but found [21]
[INFO] Run 2: PASS
They both failed when the Pulsar consumer received messages from Kafka producer. For example, see
kop/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java
Lines 155 to 161 in 86249c2
| if (log.isDebugEnabled()) { | |
| log.debug("Pulsar consumer get i: {} message: {}, key: {}", | |
| i, | |
| new String(msg.getData()), | |
| kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); | |
| } | |
| assertEquals(i, key.intValue()); |
Take an example output from surefire-reports:
17:25:22.678 [pulsar-3-2:io.streamnative.pulsar.handlers.kop.format.PulsarEntryFormatter@95] DEBUG io.streamnative.pulsar.handlers.kop.format.PulsarEntryFormatter - recordsToByteBuf , sequenceId: 0, numMessagesInBatch: 2, currentBatchSizeBytes: 90
17:25:22.678 [TestNG-method=testKafkaProduceMessageOrder-1:io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase@156] DEBUG io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase - Pulsar consumer get i: 16 message: Message_Kop_Kafk aProducePulsarConsumeOrder_16, key: 16
17:25:22.679 [TestNG-method=testKafkaProduceMessageOrder-1:io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase@156] DEBUG io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase - Pulsar consumer get i: 17 message: Message_Kop_Kafk aProducePulsarConsumeOrder_18, key: 18
and
17:25:22.868 [pulsar-3-12:io.streamnative.pulsar.handlers.kop.format.PulsarEntryFormatter@95] DEBUG io.streamnative.pulsar.handlers.kop.format.PulsarEntryFormatter - recordsToByteBuf , sequenceId: 0, numMessagesInBatch: 3, currentBatchSizeBytes: 135
17:25:22.868 [TestNG-method=testKafkaProduceMessageOrder-1:io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase@156] DEBUG io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase - Pulsar consumer get i: 7 message: Message_Kop_Kafka ProducePulsarConsumeOrder_7, key: 7
17:25:22.868 [TestNG-method=testKafkaProduceMessageOrder-1:io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase@156] DEBUG io.streamnative.pulsar.handlers.kop.KafkaMessageOrderTestBase - Pulsar consumer get i: 8 message: Message_Kop_Kafka ProducePulsarConsumeOrder_9, key: 9
It looks like to be caused by something wrong with the batched messages
Also we can see an output with more details
kop/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java
Lines 165 to 172 in 86249c2
| if (log.isDebugEnabled()) { | |
| log.info("Pulsar consumer get i: {} , messageId: {}, message: {}, key: {}", | |
| i, | |
| ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(), | |
| new String(msg.getData()), | |
| kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); | |
| } | |
| assertEquals(i, key.intValue()); |
17:20:07.928 [TestNG-method=testProduceConsumeMultiLedger-1:io.streamnative.pulsar.handlers.kop.MultiLedgerTest@166] INFO io.streamnative.pulsar.handlers.kop.MultiLedgerTest - Pulsar consumer get i: 20 , messageId: 8:0:0:0, message: Message_Kop_ProduceConsumeMultiLedger_20, key: 20
17:20:07.928 [TestNG-method=testProduceConsumeMultiLedger-1:io.streamnative.pulsar.handlers.kop.MultiLedgerTest@166] INFO io.streamnative.pulsar.handlers.kop.MultiLedgerTest - Pulsar consumer get i: 21 , messageId: 8:2:0:0, message: Message_Kop_ProduceConsumeMultiLedger_22, key: 22
We can see message id 8:1:1:1 (entry id is 2) was skipped.
Expected behavior
Pulsar consumer should not skip messages.