diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 09667294..17d95a71 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -322,6 +322,19 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { } } +void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) { + if (autoAck) { + doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) { + if (result != ResultOk) { + LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid + << ", messageId: " << messageId); + } + }); + } else { + trackMessage(messageId); + } +} + void ConsumerImpl::triggerCheckExpiredChunkedTimer() { checkExpiredChunkedTimer_->expires_from_now( boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); @@ -347,12 +360,7 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { } for (const MessageId& msgId : ctx.getChunkedMessageIds()) { LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId); - doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) { - if (result != ResultOk) { - LOG_WARN("Failed to acknowledge discarded chunk, uuid: " - << uuid << ", messageId: " << msgId); - } - }); + discardChunkMessages(uuid, msgId, true); } return true; }); @@ -383,29 +391,18 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay auto it = chunkedMessageCache_.find(uuid); - if (chunkId == 0) { - if (it == chunkedMessageCache_.end()) { - it = chunkedMessageCache_.putIfAbsent( - uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); - } + if (chunkId == 0 && it == chunkedMessageCache_.end()) { if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) { chunkedMessageCache_.removeOldestValues( chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1, - [this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) { - if (autoAckOldestChunkedMessageOnQueueFull_) { - doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) { - if (result != ResultOk) { - LOG_WARN("Failed to acknowledge discarded chunk, uuid: " - << uuid << ", messageId: " << messageId); - } - }); - } else { - trackMessage(messageId); + [this](const std::string& uuid, const ChunkedMessageCtx& ctx) { + for (const MessageId& msgId : ctx.getChunkedMessageIds()) { + discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_); } }); - it = chunkedMessageCache_.putIfAbsent( - uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); } + it = chunkedMessageCache_.putIfAbsent( + uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()}); } auto& chunkedMsgCtx = it->second; diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 9ba65773..b0a24d4c 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -307,6 +307,7 @@ class ConsumerImpl : public ConsumerImplBase { std::atomic_bool expireChunkMessageTaskScheduled_{false}; void triggerCheckExpiredChunkedTimer(); + void discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck); /** * Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 8675886f..5818bde7 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -180,6 +180,79 @@ TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { consumer.close(); } +TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { + if (toString(GetParam()) != "None") { + return; + } + const std::string topic = "MessageChunkingTest-testMaxPendingChunkMessages-" + toString(GetParam()) + + std::to_string(time(nullptr)); + Consumer consumer; + ConsumerConfiguration consumerConf; + consumerConf.setMaxPendingChunkedMessage(1); + consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); + createConsumer(topic, consumer, consumerConf); + Producer producer; + createProducer(topic, producer); + + auto msg = MessageBuilder().setContent("chunk-0-0|").build(); + auto& metadata = PulsarFriend::getMessageMetadata(msg); + metadata.set_num_chunks_from_msg(2); + metadata.set_chunk_id(0); + metadata.set_uuid("0"); + metadata.set_total_chunk_msg_size(100); + + producer.send(msg); + + auto msg2 = MessageBuilder().setContent("chunk-1-0|").build(); + auto& metadata2 = PulsarFriend::getMessageMetadata(msg2); + metadata2.set_num_chunks_from_msg(2); + metadata2.set_uuid("1"); + metadata2.set_chunk_id(0); + metadata2.set_total_chunk_msg_size(100); + + producer.send(msg2); + + auto msg3 = MessageBuilder().setContent("chunk-1-1|").build(); + auto& metadata3 = PulsarFriend::getMessageMetadata(msg3); + metadata3.set_num_chunks_from_msg(2); + metadata3.set_uuid("1"); + metadata3.set_chunk_id(1); + metadata3.set_total_chunk_msg_size(100); + + producer.send(msg3); + + Message receivedMsg; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000)); + ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1-0|chunk-1-1|"); + + consumer.redeliverUnacknowledgedMessages(); + + // The consumer may acknowledge the wrong message(the latest message) in the old version of codes. This + // test case ensure that it should not happen again. + Message receivedMsg2; + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000)); + ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|"); + + consumer.acknowledge(receivedMsg2); + + consumer.redeliverUnacknowledgedMessages(); + auto msg4 = MessageBuilder().setContent("chunk-0-1|").build(); + auto& metadata4 = PulsarFriend::getMessageMetadata(msg4); + metadata4.set_num_chunks_from_msg(2); + metadata4.set_uuid("0"); + metadata4.set_chunk_id(1); + metadata4.set_total_chunk_msg_size(100); + + producer.send(msg4); + + // This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore. + Message receivedMsg3; + consumer.receive(receivedMsg3, 3000); + + producer.close(); + consumer.close(); +} + // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,