Skip to content

[Bug] Consumer acked the wrong message when pending chunked messages exceed maxPendingChunkMessages #104

@RobertIndie

Description

@RobertIndie

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
    if (toString(GetParam()) != "None") {
        return;
    }
    const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
                              std::to_string(time(nullptr));
    Consumer consumer;
    ConsumerConfiguration consumerConf;
    consumerConf.setMaxPendingChunkedMessage(1);
    consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
    createConsumer(topic, consumer, consumerConf);
    Producer producer;
    createProducer(topic, producer);

    auto msg = MessageBuilder().setContent("chunk-0").build();
    auto& metadata = PulsarFriend::getMessageMetadata(msg);
    metadata.set_num_chunks_from_msg(2);
    metadata.set_chunk_id(0);
    metadata.set_uuid("0");
    metadata.set_total_chunk_msg_size(100);

    producer.send(msg);

    auto msg2 = MessageBuilder().setContent("chunk-1").build();
    auto& metadata2 = PulsarFriend::getMessageMetadata(msg2);
    metadata2.set_num_chunks_from_msg(2);
    metadata2.set_uuid("1");
    metadata2.set_chunk_id(0);
    metadata2.set_total_chunk_msg_size(100);

    producer.send(msg2);

    auto msg3 = MessageBuilder().setContent("chunk-1").build();
    auto& metadata3 = PulsarFriend::getMessageMetadata(msg3);
    metadata3.set_num_chunks_from_msg(2);
    metadata3.set_uuid("1");
    metadata3.set_chunk_id(1);
    metadata3.set_total_chunk_msg_size(100);

    producer.send(msg3);

    Message receivedMsg;
    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000));
    ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1chunk-1");

    consumer.redeliverUnacknowledgedMessages();

    Message receivedMsg2;
    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000));
    ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1chunk-1"); // Should pass, but failed

    producer.close();
    consumer.close();
}

The test should work.

What did you expect to see?

The test should be passed

What did you see instead?

The consumer ack the last message when the chunked messages exceed maxPendingChunkMessages.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions