From dff073d657852fd2b6107b0871e9e7a179a335ab Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 24 Aug 2022 09:51:02 +0800 Subject: [PATCH 1/6] check metadata --- pulsar-client-cpp/lib/ProducerImpl.cc | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index d806bd59f0025..e2febacec4b31 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -418,28 +418,43 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba callback(result, {}); }; + auto& msgMetadata = msg.impl_->metadata; const bool compressed = !canAddToBatch(msg); const auto payload = compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload; const auto compressedSize = static_cast(payload.readableBytes()); const auto maxMessageSize = static_cast(ClientConnection::getMaxMessageSize()); - if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) { + auto payloadChunkSize = maxMessageSize; + int totalChunks; + if (!compressed || !chunkingEnabled_) { + totalChunks = 1; + } else { + const auto metadataSize = static_cast(msgMetadata.ByteSizeLong()); + if (metadataSize >= maxMessageSize) { + LOG_WARN(getName() << " - compressed Message metadata size " << metadataSize + << " cannot exceed " << ClientConnection::getMaxMessageSize() + << " bytes unless chunking is enabled"); + handleFailedResult(ResultMessageTooBig); + return; + } + payloadChunkSize = maxMessageSize - metadataSize; + totalChunks = getNumOfChunks(compressedSize, payloadChunkSize); + } + + if (compressed && compressedSize > payloadChunkSize) { LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes() - << " cannot exceed " << ClientConnection::getMaxMessageSize() + << " cannot exceed " << payloadChunkSize << " bytes unless chunking is enabled"); handleFailedResult(ResultMessageTooBig); return; } - auto& msgMetadata = msg.impl_->metadata; if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) { handleFailedResult(ResultInvalidMessage); return; } - const int totalChunks = - canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize()); // Each chunk should be sent individually, so try to acquire extra permits for chunks. for (int i = 0; i < (totalChunks - 1); i++) { const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved @@ -490,7 +505,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba if (sendChunks) { msgMetadata.set_chunk_id(chunkId); } - const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize); + const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize); auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex); beginIndex = endIndex; From dff69a304f8c257062647a45c2f59648daab9404 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 25 Aug 2022 23:51:03 +0800 Subject: [PATCH 2/6] Support include message header size when check maxMessageSize --- pulsar-client-cpp/lib/ProducerImpl.cc | 64 ++++++++++--------- pulsar-client-cpp/tests/ProducerTest.cc | 84 +++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 28 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index e2febacec4b31..f080196b9655f 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -425,6 +425,20 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba const auto compressedSize = static_cast(payload.readableBytes()); const auto maxMessageSize = static_cast(ClientConnection::getMaxMessageSize()); + if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) { + handleFailedResult(ResultInvalidMessage); + return; + } + + Lock lock(mutex_); + uint64_t sequenceId; + if (!msgMetadata.has_sequence_id()) { + sequenceId = msgSequenceGenerator_++; + } else { + sequenceId = msgMetadata.sequence_id(); + } + setMessageMetadata(msg, sequenceId, uncompressedSize); + auto payloadChunkSize = maxMessageSize; int totalChunks; if (!compressed || !chunkingEnabled_) { @@ -432,9 +446,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } else { const auto metadataSize = static_cast(msgMetadata.ByteSizeLong()); if (metadataSize >= maxMessageSize) { - LOG_WARN(getName() << " - compressed Message metadata size " << metadataSize - << " cannot exceed " << ClientConnection::getMaxMessageSize() - << " bytes unless chunking is enabled"); + LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize + << " bytes"); handleFailedResult(ResultMessageTooBig); return; } @@ -442,19 +455,6 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba totalChunks = getNumOfChunks(compressedSize, payloadChunkSize); } - if (compressed && compressedSize > payloadChunkSize) { - LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes() - << " cannot exceed " << payloadChunkSize - << " bytes unless chunking is enabled"); - handleFailedResult(ResultMessageTooBig); - return; - } - - if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) { - handleFailedResult(ResultInvalidMessage); - return; - } - // Each chunk should be sent individually, so try to acquire extra permits for chunks. for (int i = 0; i < (totalChunks - 1); i++) { const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved @@ -464,15 +464,6 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } } - Lock lock(mutex_); - uint64_t sequenceId; - if (!msgMetadata.has_sequence_id()) { - sequenceId = msgSequenceGenerator_++; - } else { - sequenceId = msgMetadata.sequence_id(); - } - setMessageMetadata(msg, sequenceId, uncompressedSize); - if (canAddToBatch(msg)) { // Batching is enabled and the message is not delayed if (!batchMessageContainer_->hasEnoughSpace(msg)) { @@ -515,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba return; } - sendMessage(OpSendMsg{msgMetadata, encryptedPayload, - (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, - conf_.getSendTimeout(), 1, uncompressedSize}); + OpSendMsg op = + OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; + + if (!chunkingEnabled_) { + const uint32_t msgMetadataSize = op.metadata_.ByteSize(); + const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; + if (msgHeadersAndPayloadSize > maxMessageSize) { + releaseSemaphoreForSendOp(op); + LOG_WARN(getName() + << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " + << maxMessageSize << " bytes unless chunking is enabled"); + handleFailedResult(ResultMessageTooBig); + return; + } + } + + sendMessage(op); } } } diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 43947ce7958f9..63a9ac177aeb0 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -34,6 +34,9 @@ using namespace pulsar; static const std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; +// See the `maxMessageSize` config in test-conf/standalone-ssl.conf +static constexpr size_t maxMessageSize = 1024000; + TEST(ProducerTest, producerNotInitialized) { Producer producer; @@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } + +TEST(ProducerTest, testMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST(ProducerTest, testNoBatchMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST(ProducerTest, testChunkingMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + conf.setChunkingEnabled(true); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + std::string msg = std::string(2 * maxMessageSize + 10, 'b'); + Message message; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + ASSERT_LE(1L, message.getMessageId().entryId()); + + client.close(); +} From 9e14abe352c3036c13ac607fba0ccb8bccc45c09 Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 26 Aug 2022 11:57:03 +0800 Subject: [PATCH 3/6] remove outdated test --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index da5c60952dd18..d08973c45ae33 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -609,12 +609,6 @@ TEST(BasicEndToEndTest, testMessageTooBig) { result = producer.send(msg); ASSERT_EQ(ResultMessageTooBig, result); - // Anything up to MaxMessageSize should be allowed - size = ClientConnection::getMaxMessageSize(); - msg = MessageBuilder().setAllocatedContent(content, size).build(); - result = producer.send(msg); - ASSERT_EQ(ResultOk, result); - delete[] content; } From effcedc5e3fdc8f9e9497ca2009a7f751f38f214 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 30 Aug 2022 12:35:07 +0800 Subject: [PATCH 4/6] improve test --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index d08973c45ae33..804427f818334 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -609,6 +609,12 @@ TEST(BasicEndToEndTest, testMessageTooBig) { result = producer.send(msg); ASSERT_EQ(ResultMessageTooBig, result); + // Anything up to MaxMessageSize - MetadataSize should be allowed + size = ClientConnection::getMaxMessageSize() - 32; /*the default message metadata size for string schema*/ + msg = MessageBuilder().setAllocatedContent(content, size).build(); + result = producer.send(msg); + ASSERT_EQ(ResultOk, result); + delete[] content; } From 4ea30f534fd4260b5216672d77497a797f059e8b Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 7 Sep 2022 23:49:43 +0800 Subject: [PATCH 5/6] improve some comment --- pulsar-client-cpp/lib/ProducerImpl.cc | 37 ++++++++++++------------- pulsar-client-cpp/tests/ProducerTest.cc | 34 ++++------------------- 2 files changed, 24 insertions(+), 47 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index f080196b9655f..189860501ece4 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -504,28 +504,27 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) { handleFailedResult(ResultCryptoError); return; - } - OpSendMsg op = - OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, - producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize}; - - if (!chunkingEnabled_) { - const uint32_t msgMetadataSize = op.metadata_.ByteSize(); - const uint32_t payloadSize = op.payload_.readableBytes(); - const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; - if (msgHeadersAndPayloadSize > maxMessageSize) { - releaseSemaphoreForSendOp(op); - LOG_WARN(getName() - << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " - << maxMessageSize << " bytes unless chunking is enabled"); - handleFailedResult(ResultMessageTooBig); - return; + OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; + + if (!chunkingEnabled_) { + const uint32_t msgMetadataSize = op.metadata_.ByteSize(); + const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; + if (msgHeadersAndPayloadSize > maxMessageSize) { + lock.unlock(); + releaseSemaphoreForSendOp(op); + LOG_WARN(getName() << " - compressed Message size " << msgHeadersAndPayloadSize + << " cannot exceed " << maxMessageSize + << " bytes unless chunking is enabled"); + handleFailedResult(ResultMessageTooBig); + return; + } } - } - sendMessage(op); + sendMessage(op); } } } diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 63a9ac177aeb0..b34cdd0446245 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -214,33 +214,9 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } -TEST(ProducerTest, testMaxMessageSize) { - Client client(serviceUrl); - - const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr)); +class ProducerTest : public ::testing::TestWithParam {}; - Consumer consumer; - ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); - - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); - - std::string msg = std::string(maxMessageSize / 2, 'a'); - ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); - Message message; - ASSERT_EQ(ResultOk, consumer.receive(message)); - ASSERT_EQ(msg, message.getDataAsString()); - - std::string orderKey = std::string(maxMessageSize, 'a'); - ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); - - ASSERT_EQ(ResultMessageTooBig, - producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); - - client.close(); -} - -TEST(ProducerTest, testNoBatchMaxMessageSize) { +TEST_P(ProducerTest, testMaxMessageSize) { Client client(serviceUrl); const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr)); @@ -250,7 +226,7 @@ TEST(ProducerTest, testNoBatchMaxMessageSize) { Producer producer; ProducerConfiguration conf; - conf.setBatchingEnabled(false); + conf.setBatchingEnabled(GetParam()); ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); std::string msg = std::string(maxMessageSize / 2, 'a'); @@ -268,7 +244,7 @@ TEST(ProducerTest, testNoBatchMaxMessageSize) { client.close(); } -TEST(ProducerTest, testChunkingMaxMessageSize) { +TEST_P(ProducerTest, testChunkingMaxMessageSize) { Client client(serviceUrl); const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr)); @@ -294,3 +270,5 @@ TEST(ProducerTest, testChunkingMaxMessageSize) { client.close(); } + +INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false)); From 2e0fc92da778bc8c42fd99203e64237aa66fca19 Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 9 Sep 2022 09:28:31 +0800 Subject: [PATCH 6/6] fix code format --- pulsar-client-cpp/lib/ProducerImpl.cc | 38 +++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 10cdbdedc3588..3f5f40fd82403 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -519,27 +519,27 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) { handleFailedResult(ResultCryptoError); return; - - OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, - producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize}; - - if (!chunkingEnabled_) { - const uint32_t msgMetadataSize = op.metadata_.ByteSize(); - const uint32_t payloadSize = op.payload_.readableBytes(); - const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; - if (msgHeadersAndPayloadSize > maxMessageSize) { - lock.unlock(); - releaseSemaphoreForSendOp(op); - LOG_WARN(getName() << " - compressed Message size " << msgHeadersAndPayloadSize - << " cannot exceed " << maxMessageSize - << " bytes unless chunking is enabled"); - handleFailedResult(ResultMessageTooBig); - return; - } + } + OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; + + if (!chunkingEnabled_) { + const uint32_t msgMetadataSize = op.metadata_.ByteSize(); + const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; + if (msgHeadersAndPayloadSize > maxMessageSize) { + lock.unlock(); + releaseSemaphoreForSendOp(op); + LOG_WARN(getName() + << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " + << maxMessageSize << " bytes unless chunking is enabled"); + handleFailedResult(ResultMessageTooBig); + return; } + } - sendMessage(op); + sendMessage(op); } } }