From f5120eb2bdd797640f69d561c61535ab69a6690d Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 23 Aug 2021 12:23:10 -0700 Subject: [PATCH 1/2] kafka: record extractor for produce requests in mesh-filter Signed-off-by: Adam Kotwasinski --- .../produce_record_extractor.cc | 173 ++++++++++++- .../produce_record_extractor.h | 20 +- .../network/test/mesh/command_handlers/BUILD | 9 + .../produce_record_extractor_unit_test.cc | 243 ++++++++++++++++++ 4 files changed, 441 insertions(+), 4 deletions(-) create mode 100644 contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc index ee970b27cab1f..5874f9e258225 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc @@ -7,8 +7,177 @@ namespace Kafka { namespace Mesh { std::vector -PlaceholderRecordExtractor::extractRecords(const std::vector&) const { - return {}; +RecordExtractorImpl::extractRecords(const std::vector& data) const { + std::vector result; + for (const auto& topic_data : data) { + for (const auto& partition_data : topic_data.partitions_) { + // Kafka protocol allows nullable data. + if (partition_data.records_) { + const auto topic_result = extractPartitionRecords( + topic_data.name_, partition_data.partition_index_, *(partition_data.records_)); + std::copy(topic_result.begin(), topic_result.end(), std::back_inserter(result)); + } + } + } + return result; +} + +std::vector RecordExtractorImpl::extractPartitionRecords(const std::string& topic, + const int32_t partition, + const Bytes& bytes) const { + + // Reference implementation: + // org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(ByteBuffer, long, int, int, byte, + // CompressionType, TimestampType, long, long, long, short, int, boolean, boolean, int, int) + absl::string_view data = {reinterpret_cast(bytes.data()), bytes.size()}; + + // Fields common to any records payload. Magic will follow. + const unsigned int common_fields_size = + /* BaseOffset */ 8 + /* Length */ 4 + /* PartitionLeaderEpoch */ 4; + if (data.length() < common_fields_size) { + throw EnvoyException(fmt::format("record batch for [{}-{}] is too short (no common fields): {}", + topic, partition, data.length())); + } + // Let's skip these common fields, because we are not using them. + data = {data.data() + common_fields_size, data.length() - common_fields_size}; + + // Extract magic. + // Magic tells us what is the format of records present in the byte array. + Int8Deserializer magic_deserializer; + magic_deserializer.feed(data); + if (magic_deserializer.ready()) { + int8_t magic = magic_deserializer.get(); + if (2 == magic) { + // Magic format introduced around Kafka 1.0.0 and still used with Kafka 2.4. + // We can extract the records out of the record batch. + return extractRecordsOutOfBatchWithMagicEqualTo2(topic, partition, data); + } else { + // Old client sending old magic, or Apache Kafka introducing new magic. + throw EnvoyException(fmt::format("unknown magic value in record batch for [{}-{}]: {}", topic, + partition, magic)); + } + } else { + throw EnvoyException( + fmt::format("magic byte is not present in record batch for [{}-{}]", topic, partition)); + } +} + +std::vector RecordExtractorImpl::extractRecordsOutOfBatchWithMagicEqualTo2( + const std::string& topic, const int32_t partition, absl::string_view data) const { + + // Not going to reuse the information in these fields, because we are going to republish. + unsigned int ignored_fields_size = + /* CRC */ 4 + /* Attributes */ 2 + /* LastOffsetDelta */ 4 + + /* FirstTimestamp */ 8 + /* MaxTimestamp */ 8 + /* ProducerId */ 8 + + /* ProducerEpoch */ 2 + /* BaseSequence */ 4 + /* RecordCount */ 4; + + if (data.length() < ignored_fields_size) { + throw EnvoyException( + fmt::format("record batch for [{}-{}] is too short (no attribute fields): {}", topic, + partition, data.length())); + } + data = {data.data() + ignored_fields_size, data.length() - ignored_fields_size}; + + // We have managed to consume all the fancy bytes, now it's time to get to records. + + std::vector result; + while (!data.empty()) { + const OutboundRecord record = extractRecord(topic, partition, data); + result.push_back(record); + } + return result; +} + +OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, const int32_t partition, + absl::string_view& data) const { + // The reference implementation is: + // org.apache.kafka.common.record.DefaultRecord.writeTo(DataOutputStream, int, long, ByteBuffer, + // ByteBuffer, Header[]) + + VarInt32Deserializer length; + length.feed(data); + if (!length.ready()) { + throw EnvoyException( + fmt::format("record for [{}-{}] is too short (no length)", topic, partition)); + } + const int32_t len = length.get(); + if (len < 0) { + throw EnvoyException( + fmt::format("record for [{}-{}] has invalid length: {}", topic, partition, len)); + } + if (static_cast(len) > data.length()) { + throw EnvoyException(fmt::format("record for [{}-{}] is too short (not enough bytes provided)", + topic, partition)); + } + + const absl::string_view expected_end_of_record = {data.data() + len, data.length() - len}; + + Int8Deserializer attributes; + attributes.feed(data); + VarInt64Deserializer tsDelta; + tsDelta.feed(data); + VarUInt32Deserializer offsetDelta; + offsetDelta.feed(data); + if (!attributes.ready() || !tsDelta.ready() || !offsetDelta.ready()) { + throw EnvoyException( + fmt::format("attributes not present in record for [{}-{}]", topic, partition)); + } + + absl::string_view key = extractElement(data); + absl::string_view value = extractElement(data); + + VarInt32Deserializer headers_count_deserializer; + headers_count_deserializer.feed(data); + if (!headers_count_deserializer.ready()) { + throw EnvoyException( + fmt::format("header count not present in record for [{}-{}]", topic, partition)); + } + const int32_t headers_count = headers_count_deserializer.get(); + if (headers_count < 0) { + throw EnvoyException(fmt::format("invalid header count in record for [{}-{}]: {}", topic, + partition, headers_count)); + } + for (int32_t i = 0; i < headers_count; ++i) { + extractElement(data); // header key + extractElement(data); // header value + } + + if (data == expected_end_of_record) { + // We have consumed everything nicely. + return OutboundRecord{topic, partition, key, value}; + } else { + // Bad data - there are bytes left. + throw EnvoyException(fmt::format("data left after consuming record for [{}-{}]: {}", topic, + partition, data.length())); + } +} + +// Most of the fields in records are kept as variable-encoded length and following bytes. +// So here we have a helper function to get the data (such as key, value) out of given input. +absl::string_view RecordExtractorImpl::extractElement(absl::string_view& input) { + VarInt32Deserializer length_deserializer; + length_deserializer.feed(input); + if (!length_deserializer.ready()) { + throw EnvoyException("byte array length not present"); + } + const int32_t length = length_deserializer.get(); + // Length can be negative (null value was published by client). + if (-1 == length) { + return {}; + } + + if (length >= 0) { + if (static_cast(length) > input.size()) { + throw EnvoyException(fmt::format("byte array length larger than data provided: {} vs {}", + length, input.size())); + } + const absl::string_view result = {input.data(), + static_cast(length)}; + input = {input.data() + length, input.length() - length}; + return result; + } else { + throw EnvoyException(fmt::format("byte array length less than -1: {}", length)); + } } } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h index e8c89fcbd3e3e..ca3a2dab48aa4 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h @@ -21,12 +21,28 @@ class RecordExtractor { }; /** - * Just a placeholder for now. + * Proper implementation of record extractor, capable of parsing V2 record set. + * Reference: https://kafka.apache.org/24/documentation/#messageformat */ -class PlaceholderRecordExtractor : public RecordExtractor { +class RecordExtractorImpl : public RecordExtractor { public: std::vector extractRecords(const std::vector& data) const override; + + static absl::string_view extractElement(absl::string_view& input); + +private: + std::vector extractPartitionRecords(const std::string& topic, + const int32_t partition, + const Bytes& records) const; + + // Impl note: I'm sorry for the long name. + std::vector extractRecordsOutOfBatchWithMagicEqualTo2(const std::string& topic, + const int32_t partition, + absl::string_view sv) const; + + OutboundRecord extractRecord(const std::string& topic, const int32_t partition, + absl::string_view& data) const; }; } // namespace Mesh diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD index 3c57a687c37a4..18b75f4206f50 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD @@ -19,6 +19,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "produce_record_extractor_unit_test", + srcs = ["produce_record_extractor_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:produce_record_extractor_lib", + ], +) + envoy_cc_test( name = "metadata_unit_test", srcs = ["metadata_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc new file mode 100644 index 0000000000000..56715889997c9 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc @@ -0,0 +1,243 @@ +#include + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/external/requests.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +// Simple matcher that verifies that the input given is a collection containing correct number of +// unique (!) records for given topic-partition pairs. +MATCHER_P3(HasRecords, topic, partition, expected, "") { + size_t expected_count = expected; + std::set saved_key_pointers = {}; + std::set saved_value_pointers = {}; + size_t count = 0; + + for (const auto& record : arg) { + if (record.topic_ == topic && record.partition_ == partition) { + saved_key_pointers.insert(record.key_); + saved_value_pointers.insert(record.value_); + ++count; + } + } + + if (expected_count != count) { + return false; + } + if (expected_count != saved_key_pointers.size()) { + return false; + } + return saved_key_pointers.size() == saved_value_pointers.size(); +} + +// Helper function to create a record batch that contains a single record with 5-byte key and 5-byte +// value. +Bytes makeGoodRecordBatch() { + // Record batch bytes get ignored (apart from magic field), so we can put 0 there. + Bytes result = Bytes(16 + 1 + 44); + result[16] = 2; // Record batch magic value. + Bytes real_data = {/* Length = 36 */ 72, + /* Attributes */ 0, + /* Timestamp delta */ 0, + /* Offset delta */ 0, + /* Key length = 5 */ 10, + 107, + 107, + 107, + 107, + 107, + /* Value length = 5 */ 10, + 118, + 118, + 118, + 118, + 118, + /* Headers count = 2 */ 4, + /* Header key length = 3 */ 6, + 49, + 49, + 49, + /* Header value length = 5 */ 10, + 97, + 97, + 97, + 97, + 97, + /* Header key length = 3 */ 6, + 50, + 50, + 50, + /* Header value length = 5 */ 10, + 98, + 98, + 98, + 98, + 98}; + result.insert(result.end(), real_data.begin(), real_data.end()); + return result; +} + +TEST(RecordExtractorImpl, shouldProcessRecordBytes) { + // given + const RecordExtractorImpl testee; + + const PartitionProduceData t1_ppd1 = {0, makeGoodRecordBatch()}; + const PartitionProduceData t1_ppd2 = {1, makeGoodRecordBatch()}; + const PartitionProduceData t1_ppd3 = {2, makeGoodRecordBatch()}; + const TopicProduceData tpd1 = {"topic1", {t1_ppd1, t1_ppd2, t1_ppd3}}; + + // Weird input from client, protocol allows sending null value as bytes array. + const PartitionProduceData t2_ppd = {20, absl::nullopt}; + const TopicProduceData tpd2 = {"topic2", {t2_ppd}}; + + const std::vector input = {tpd1, tpd2}; + + // when + const auto result = testee.extractRecords(input); + + // then + EXPECT_THAT(result, HasRecords("topic1", 0, 1)); + EXPECT_THAT(result, HasRecords("topic1", 1, 1)); + EXPECT_THAT(result, HasRecords("topic1", 2, 1)); + EXPECT_THAT(result, HasRecords("topic2", 20, 0)); +} + +/** + * Helper function to make record batch (batch contains 1+ records). + * We use 'stage' parameter to make it a single function with various failure modes. + */ +const std::vector makeTopicProduceData(const unsigned int stage) { + Bytes bytes = makeGoodRecordBatch(); + if (1 == stage) { + // No common fields before magic. + bytes.erase(bytes.begin(), bytes.end()); + } + if (2 == stage) { + // No magic. + bytes.erase(bytes.begin() + 16, bytes.end()); + } + if (3 == stage) { + // Bad magic. + bytes[16] = 42; + } + if (4 == stage) { + // No common fields after magic. + bytes.erase(bytes.begin() + 17, bytes.end()); + } + if (5 == stage) { + // No record length after common fields. + bytes[61] = 128; // This will force variable-length deserializer to wait for more bytes. + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (6 == stage) { + // Record length is higher than size of real data. + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (7 == stage) { + // Attributes field has negative length. + bytes[61] = 3; /* -1 */ + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (8 == stage) { + // Attributes field is missing - length is valid, but there is no more data to read. + bytes[61] = 0; + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (9 == stage) { + // Header count not present - we are going to drop all 21 header bytes after value. + bytes[61] = (36 - 21) << 1; // Length is encoded as variable length. + bytes.erase(bytes.begin() + 77, bytes.end()); + } + if (10 == stage) { + // Negative variable length integer for header count. + bytes[77] = 17; + } + if (11 == stage) { + // Last header value is going to be shorter, so there will be one unconsumed byte. + bytes[92] = 8; + } + const PartitionProduceData ppd = {0, bytes}; + const TopicProduceData tpd = {"topic", {ppd}}; + return {tpd}; +} + +TEST(RecordExtractorImpl, shouldHandleInvalidRecordBytes) { + const RecordExtractorImpl testee; + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(1)), EnvoyException, + "no common fields"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(2)), EnvoyException, + "magic byte is not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(3)), EnvoyException, + "unknown magic value"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(4)), EnvoyException, + "no attribute fields"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(5)), EnvoyException, + "no length"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(6)), EnvoyException, + "not enough bytes provided"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(7)), EnvoyException, + "has invalid length"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(8)), EnvoyException, + "attributes not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(9)), EnvoyException, + "header count not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(10)), EnvoyException, + "invalid header count"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(11)), EnvoyException, + "data left after consuming record"); +} + +// Minor helper function. +absl::string_view bytesToStringView(const Bytes& bytes) { + return {reinterpret_cast(bytes.data()), bytes.size()}; +} + +TEST(RecordExtractorImpl, shouldExtractElementData) { + { + const Bytes noBytes = Bytes(0); + auto arg = bytesToStringView(noBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + "byte array length not present"); + } + { + const Bytes nullValueBytes = {0b00000001}; // Length = -1. + auto arg = bytesToStringView(nullValueBytes); + EXPECT_EQ(RecordExtractorImpl::extractElement(arg), absl::string_view()); + } + { + const Bytes negativeLengthBytes = {0b01111111}; // Length = -64. + auto arg = bytesToStringView(negativeLengthBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + "byte array length less than -1: -64"); + } + { + const Bytes bigLengthBytes = {0b01111110}; // Length = 63. + auto arg = bytesToStringView(bigLengthBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + "byte array length larger than data provided: 63 vs 0"); + } + { + // Length = 4, 7 bytes follow, 4 should be consumed, 13s should stay. + const Bytes goodBytes = {0b00001000, 42, 42, 42, 42, 13, 13, 13}; + auto arg = bytesToStringView(goodBytes); + EXPECT_EQ(RecordExtractorImpl::extractElement(arg), + absl::string_view(reinterpret_cast(goodBytes.data() + 1), 4)); + EXPECT_EQ(arg.data(), reinterpret_cast(goodBytes.data() + 5)); + EXPECT_EQ(arg.size(), 3); + } +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy From 7078ecb0153a901debf537b6c383ba96bafeb90d Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 25 Aug 2021 10:56:43 -0700 Subject: [PATCH 2/2] Code cleanup Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/produce.cc | 2 +- .../produce_record_extractor.cc | 131 ++++++++++-------- .../produce_record_extractor.h | 11 +- .../produce_record_extractor_unit_test.cc | 14 +- 4 files changed, 90 insertions(+), 68 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc index b44a452358c54..e2ed06fdbb17e 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc @@ -13,7 +13,7 @@ constexpr static int16_t NO_ERROR = 0; ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade, const std::shared_ptr> request) - : ProduceRequestHolder{filter, kafka_facade, PlaceholderRecordExtractor{}, request} {}; + : ProduceRequestHolder{filter, kafka_facade, RecordExtractorImpl{}, request} {}; ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade, diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc index 5874f9e258225..3c98dc4885cf9 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc @@ -22,64 +22,76 @@ RecordExtractorImpl::extractRecords(const std::vector& data) c return result; } +// Fields common to any record batch payload. +// See: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L46 +constexpr unsigned int RECORD_BATCH_COMMON_FIELDS_SIZE = /* BaseOffset */ sizeof(int64_t) + + /* Length */ sizeof(int32_t) + + /* PartitionLeaderEpoch */ sizeof(int32_t); + +// Magic format introduced around Kafka 1.0.0 and still used with Kafka 2.4. +// We can extract records out of record batches that use this magic. +constexpr int8_t SUPPORTED_MAGIC = 2; + +// Reference implementation: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L443 std::vector RecordExtractorImpl::extractPartitionRecords(const std::string& topic, const int32_t partition, const Bytes& bytes) const { - // Reference implementation: - // org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(ByteBuffer, long, int, int, byte, - // CompressionType, TimestampType, long, long, long, short, int, boolean, boolean, int, int) absl::string_view data = {reinterpret_cast(bytes.data()), bytes.size()}; - // Fields common to any records payload. Magic will follow. - const unsigned int common_fields_size = - /* BaseOffset */ 8 + /* Length */ 4 + /* PartitionLeaderEpoch */ 4; - if (data.length() < common_fields_size) { + // Let's skip these common fields, because we are not using them. + if (data.length() < RECORD_BATCH_COMMON_FIELDS_SIZE) { throw EnvoyException(fmt::format("record batch for [{}-{}] is too short (no common fields): {}", topic, partition, data.length())); } - // Let's skip these common fields, because we are not using them. - data = {data.data() + common_fields_size, data.length() - common_fields_size}; + data = {data.data() + RECORD_BATCH_COMMON_FIELDS_SIZE, + data.length() - RECORD_BATCH_COMMON_FIELDS_SIZE}; - // Extract magic. - // Magic tells us what is the format of records present in the byte array. + // Extract magic - it what is the format of records present in the bytes provided. Int8Deserializer magic_deserializer; magic_deserializer.feed(data); - if (magic_deserializer.ready()) { - int8_t magic = magic_deserializer.get(); - if (2 == magic) { - // Magic format introduced around Kafka 1.0.0 and still used with Kafka 2.4. - // We can extract the records out of the record batch. - return extractRecordsOutOfBatchWithMagicEqualTo2(topic, partition, data); - } else { - // Old client sending old magic, or Apache Kafka introducing new magic. - throw EnvoyException(fmt::format("unknown magic value in record batch for [{}-{}]: {}", topic, - partition, magic)); - } - } else { + if (!magic_deserializer.ready()) { throw EnvoyException( fmt::format("magic byte is not present in record batch for [{}-{}]", topic, partition)); } -} -std::vector RecordExtractorImpl::extractRecordsOutOfBatchWithMagicEqualTo2( - const std::string& topic, const int32_t partition, absl::string_view data) const { + // Old client sending old magic, or Apache Kafka introducing new magic. + const int8_t magic = magic_deserializer.get(); + if (SUPPORTED_MAGIC != magic) { + throw EnvoyException(fmt::format("unknown magic value in record batch for [{}-{}]: {}", topic, + partition, magic)); + } - // Not going to reuse the information in these fields, because we are going to republish. - unsigned int ignored_fields_size = - /* CRC */ 4 + /* Attributes */ 2 + /* LastOffsetDelta */ 4 + - /* FirstTimestamp */ 8 + /* MaxTimestamp */ 8 + /* ProducerId */ 8 + - /* ProducerEpoch */ 2 + /* BaseSequence */ 4 + /* RecordCount */ 4; + // We have received a record batch with good magic. + return processRecordBatch(topic, partition, data); +} - if (data.length() < ignored_fields_size) { +// Record batch fields we are going to ignore (because we rip it up and send its contents). +// See: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L50 +// and: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L471 +constexpr unsigned int IGNORED_FIELDS_SIZE = + /* CRC */ sizeof(int32_t) + /* Attributes */ sizeof(int16_t) + + /* LastOffsetDelta */ sizeof(int32_t) + /* FirstTimestamp */ sizeof(int64_t) + + /* MaxTimestamp */ sizeof(int64_t) + /* ProducerId */ sizeof(int64_t) + + /* ProducerEpoch */ sizeof(int16_t) + /* BaseSequence */ sizeof(int32_t) + + /* RecordCount */ sizeof(int32_t); + +std::vector RecordExtractorImpl::processRecordBatch(const std::string& topic, + const int32_t partition, + absl::string_view data) const { + + if (data.length() < IGNORED_FIELDS_SIZE) { throw EnvoyException( fmt::format("record batch for [{}-{}] is too short (no attribute fields): {}", topic, partition, data.length())); } - data = {data.data() + ignored_fields_size, data.length() - ignored_fields_size}; + data = {data.data() + IGNORED_FIELDS_SIZE, data.length() - IGNORED_FIELDS_SIZE}; // We have managed to consume all the fancy bytes, now it's time to get to records. - std::vector result; while (!data.empty()) { const OutboundRecord record = extractRecord(topic, partition, data); @@ -88,11 +100,10 @@ std::vector RecordExtractorImpl::extractRecordsOutOfBatchWithMag return result; } +// Reference implementation: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L179 OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, const int32_t partition, absl::string_view& data) const { - // The reference implementation is: - // org.apache.kafka.common.record.DefaultRecord.writeTo(DataOutputStream, int, long, ByteBuffer, - // ByteBuffer, Header[]) VarInt32Deserializer length; length.feed(data); @@ -112,6 +123,8 @@ OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, cons const absl::string_view expected_end_of_record = {data.data() + len, data.length() - len}; + // We throw away the following batch fields: attributes, timestamp delta, offset delta (cannot do + // an easy jump, as some are variable-length). Int8Deserializer attributes; attributes.feed(data); VarInt64Deserializer tsDelta; @@ -123,9 +136,11 @@ OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, cons fmt::format("attributes not present in record for [{}-{}]", topic, partition)); } - absl::string_view key = extractElement(data); - absl::string_view value = extractElement(data); + // Record key and value. + const absl::string_view key = extractByteArray(data); + const absl::string_view value = extractByteArray(data); + // Headers. VarInt32Deserializer headers_count_deserializer; headers_count_deserializer.feed(data); if (!headers_count_deserializer.ready()) { @@ -138,8 +153,9 @@ OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, cons partition, headers_count)); } for (int32_t i = 0; i < headers_count; ++i) { - extractElement(data); // header key - extractElement(data); // header value + // For now, we ignore headers. + extractByteArray(data); // Header key. + extractByteArray(data); // Header value. } if (data == expected_end_of_record) { @@ -152,32 +168,37 @@ OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, cons } } -// Most of the fields in records are kept as variable-encoded length and following bytes. -// So here we have a helper function to get the data (such as key, value) out of given input. -absl::string_view RecordExtractorImpl::extractElement(absl::string_view& input) { +absl::string_view RecordExtractorImpl::extractByteArray(absl::string_view& input) { + + // Get the length. VarInt32Deserializer length_deserializer; length_deserializer.feed(input); if (!length_deserializer.ready()) { throw EnvoyException("byte array length not present"); } const int32_t length = length_deserializer.get(); - // Length can be negative (null value was published by client). + + // Length can be -1 (null value was published by client). if (-1 == length) { return {}; } - if (length >= 0) { - if (static_cast(length) > input.size()) { - throw EnvoyException(fmt::format("byte array length larger than data provided: {} vs {}", - length, input.size())); - } - const absl::string_view result = {input.data(), - static_cast(length)}; - input = {input.data() + length, input.length() - length}; - return result; - } else { + // Otherwise, length cannot be negative. + if (length < 0) { throw EnvoyException(fmt::format("byte array length less than -1: {}", length)); } + + // Underflow handling. + if (static_cast(length) > input.size()) { + throw EnvoyException( + fmt::format("byte array length larger than data provided: {} vs {}", length, input.size())); + } + + // We have enough data to return it. + const absl::string_view result = {input.data(), + static_cast(length)}; + input = {input.data() + length, input.length() - length}; + return result; } } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h index ca3a2dab48aa4..59c6e7380e4fa 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h @@ -26,20 +26,21 @@ class RecordExtractor { */ class RecordExtractorImpl : public RecordExtractor { public: + // RecordExtractor std::vector extractRecords(const std::vector& data) const override; - static absl::string_view extractElement(absl::string_view& input); + // Helper function to get the data (such as key, value) out of given input, as most of the + // interesting fields in records are kept as variable-encoded length and following bytes. + static absl::string_view extractByteArray(absl::string_view& input); private: std::vector extractPartitionRecords(const std::string& topic, const int32_t partition, const Bytes& records) const; - // Impl note: I'm sorry for the long name. - std::vector extractRecordsOutOfBatchWithMagicEqualTo2(const std::string& topic, - const int32_t partition, - absl::string_view sv) const; + std::vector processRecordBatch(const std::string& topic, const int32_t partition, + absl::string_view data) const; OutboundRecord extractRecord(const std::string& topic, const int32_t partition, absl::string_view& data) const; diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc index 56715889997c9..068aa40c1334b 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc @@ -200,35 +200,35 @@ absl::string_view bytesToStringView(const Bytes& bytes) { return {reinterpret_cast(bytes.data()), bytes.size()}; } -TEST(RecordExtractorImpl, shouldExtractElementData) { +TEST(RecordExtractorImpl, shouldExtractByteArray) { { const Bytes noBytes = Bytes(0); auto arg = bytesToStringView(noBytes); - EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, "byte array length not present"); } { const Bytes nullValueBytes = {0b00000001}; // Length = -1. auto arg = bytesToStringView(nullValueBytes); - EXPECT_EQ(RecordExtractorImpl::extractElement(arg), absl::string_view()); + EXPECT_EQ(RecordExtractorImpl::extractByteArray(arg), absl::string_view()); } { const Bytes negativeLengthBytes = {0b01111111}; // Length = -64. auto arg = bytesToStringView(negativeLengthBytes); - EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, "byte array length less than -1: -64"); } { const Bytes bigLengthBytes = {0b01111110}; // Length = 63. auto arg = bytesToStringView(bigLengthBytes); - EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractElement(arg), EnvoyException, + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, "byte array length larger than data provided: 63 vs 0"); } { - // Length = 4, 7 bytes follow, 4 should be consumed, 13s should stay. + // Length = 4, 7 bytes follow, 4 should be consumed, 13s should stay unconsumed. const Bytes goodBytes = {0b00001000, 42, 42, 42, 42, 13, 13, 13}; auto arg = bytesToStringView(goodBytes); - EXPECT_EQ(RecordExtractorImpl::extractElement(arg), + EXPECT_EQ(RecordExtractorImpl::extractByteArray(arg), absl::string_view(reinterpret_cast(goodBytes.data() + 1), 4)); EXPECT_EQ(arg.data(), reinterpret_cast(goodBytes.data() + 5)); EXPECT_EQ(arg.size(), 3);