From 43bda68c102a3b6726568e76e36ac711b48d04b9 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 9 Nov 2022 16:17:50 +0800 Subject: [PATCH 1/2] [fix] Fix wrong behavior when the number of chunked messages exceed maxPendingChunkedMessages --- lib/ConsumerImpl.cc | 40 ++++++++++---------- lib/ConsumerImpl.h | 1 + tests/MessageChunkingTest.cc | 73 ++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 20 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 09667294..8658e089 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -322,6 +322,19 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { } } +void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) { + if (autoAck) { + doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid + << ", messageId: " << messageId); + } + }); + } else { + trackMessage(messageId); + } +} + void ConsumerImpl::triggerCheckExpiredChunkedTimer() { checkExpiredChunkedTimer_->expires_from_now( boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); @@ -347,12 +360,7 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { } for (const MessageId& msgId : ctx.getChunkedMessageIds()) { LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); - doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { - if (result != ResultOk) { - LOG_WARN("Failed to acknowledge discarded chunk, uuid: " - << uuid << ", messageId: " << msgId); - } - }); + discardChunkMessages(uuid, msgId, true); } return true; }); @@ -388,23 +396,15 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay it = chunkedMessageCache_.putIfAbsent( uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); } - if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { + if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() > maxPendingChunkedMessage_) { chunkedMessageCache_.removeOldestValues( - chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, - [this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) { - if (autoAckOldestChunkedMessageOnQueueFull_) { - doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) { - if (result != ResultOk) { - LOG_WARN("Failed to acknowledge discarded chunk, uuid: " - << uuid << ", messageId: " << messageId); - } - }); - } else { - trackMessage(messageId); + chunkedMessageCache_.size() - maxPendingChunkedMessage_, + [this](const std::string& uuid, const ChunkedMessageCtx& ctx) { + for (const MessageId& msgId : ctx.getChunkedMessageIds()) { + discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_); } }); - it = chunkedMessageCache_.putIfAbsent( - uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); + it = chunkedMessageCache_.find(uuid); // Need to reset the iterator after changing the cache. } } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 9ba65773..b0a24d4c 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -307,6 +307,7 @@ class ConsumerImpl : public ConsumerImplBase { std::atomic_bool expireChunkMessageTaskScheduled_{false}; void triggerCheckExpiredChunkedTimer(); + void discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck); /** * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 8675886f..5818bde7 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -180,6 +180,79 @@ TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { consumer.close(); } +TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = "MessageChunkingTest-testMaxPendingChunkMessages-" + toString(GetParam()) + + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setMaxPendingChunkedMessage(1); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + auto msg = MessageBuilder().setContent("chunk-0-0|").build(); + auto& metadata = PulsarFriend::getMessageMetadata(msg); + metadata.set_num_chunks_from_msg(2); + metadata.set_chunk_id(0); + metadata.set_uuid("0"); + metadata.set_total_chunk_msg_size(100); + + producer.send(msg); + + auto msg2 = MessageBuilder().setContent("chunk-1-0|").build(); + auto& metadata2 = PulsarFriend::getMessageMetadata(msg2); + metadata2.set_num_chunks_from_msg(2); + metadata2.set_uuid("1"); + metadata2.set_chunk_id(0); + metadata2.set_total_chunk_msg_size(100); + + producer.send(msg2); + + auto msg3 = MessageBuilder().setContent("chunk-1-1|").build(); + auto& metadata3 = PulsarFriend::getMessageMetadata(msg3); + metadata3.set_num_chunks_from_msg(2); + metadata3.set_uuid("1"); + metadata3.set_chunk_id(1); + metadata3.set_total_chunk_msg_size(100); + + producer.send(msg3); + + Message receivedMsg; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000)); + ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1-0|chunk-1-1|"); + + consumer.redeliverUnacknowledgedMessages(); + + // The consumer may acknowledge the wrong message(the latest message) in the old version of codes. This + // test case ensure that it should not happen again. + Message receivedMsg2; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000)); + ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|"); + + consumer.acknowledge(receivedMsg2); + + consumer.redeliverUnacknowledgedMessages(); + auto msg4 = MessageBuilder().setContent("chunk-0-1|").build(); + auto& metadata4 = PulsarFriend::getMessageMetadata(msg4); + metadata4.set_num_chunks_from_msg(2); + metadata4.set_uuid("0"); + metadata4.set_chunk_id(1); + metadata4.set_total_chunk_msg_size(100); + + producer.send(msg4); + + // This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore. + Message receivedMsg3; + consumer.receive(receivedMsg3, 3000); + + producer.close(); + consumer.close(); +} + // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD, From 469cb9f0fa0d31c1eb19a91c204d4e5f9cb848a4 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 10 Nov 2022 15:31:29 +0800 Subject: [PATCH 2/2] Refactor the discarding ctx logic. --- lib/ConsumerImpl.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 8658e089..17d95a71 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -391,21 +391,18 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay auto it = chunkedMessageCache_.find(uuid); - if (chunkId == 0) { - if (it == chunkedMessageCache_.end()) { - it = chunkedMessageCache_.putIfAbsent( - uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); - } - if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() > maxPendingChunkedMessage_) { + if (chunkId == 0 && it == chunkedMessageCache_.end()) { + if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { chunkedMessageCache_.removeOldestValues( - chunkedMessageCache_.size() - maxPendingChunkedMessage_, + chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, [this](const std::string& uuid, const ChunkedMessageCtx& ctx) { for (const MessageId& msgId : ctx.getChunkedMessageIds()) { discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_); } }); - it = chunkedMessageCache_.find(uuid); // Need to reset the iterator after changing the cache. } + it = chunkedMessageCache_.putIfAbsent( + uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); } auto& chunkedMsgCtx = it->second;