From 855c83bebec1f3b41f8c147bf44fae51f589da2d Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sun, 3 Dec 2023 09:55:52 +0800 Subject: [PATCH] [feature](parquet)support read parquet lzo compress. (#27706) --- be/src/util/block_compression.cpp | 101 ++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 58c75a0c433f7f..b7e93dbeb536ec 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -45,11 +45,26 @@ #include "common/config.h" #include "exec/decompressor.h" +#include "gutil/endian.h" #include "gutil/strings/substitute.h" +#include "orc/OrcFile.hh" #include "util/bit_util.h" #include "util/defer_op.h" #include "util/faststring.h" +namespace orc { +/** + * Decompress the bytes in to the output buffer. + * @param inputAddress the start of the input + * @param inputLimit one past the last byte of the input + * @param outputAddress the start of the output buffer + * @param outputLimit one past the last byte of the output buffer + * @result the number of bytes decompressed + */ +uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, + char* outputLimit); +} // namespace orc + namespace doris { using strings::Substitute; @@ -1071,6 +1086,89 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { }; #endif +class LzoBlockCompression final : public BlockCompressionCodec { +public: + static LzoBlockCompression* instance() { + static LzoBlockCompression s_instance; + return &s_instance; + } + + Status compress(const Slice& input, faststring* output) override { + return Status::InvalidArgument("not impl lzo compress."); + } + size_t max_compressed_len(size_t len) override { return 0; }; + Status decompress(const Slice& input, Slice* output) override { + auto* input_ptr = input.data; + auto remain_input_size = input.size; + auto* output_ptr = output->data; + auto remain_output_size = output->size; + auto* output_limit = output->data + output->size; + + // Example: + // OriginData(The original data will be divided into several large data block.) : + // large data block1 | large data block2 | large data block3 | .... + // The large data block will be divided into several small data block. + // Suppose a large data block is divided into three small blocks: + // large data block1: | small block1 | small block2 | small block3 | + // CompressData: + // + // A : original length of the current block of large data block. + // sizeof(A) = 4 bytes. + // A = length(small block1) + length(small block2) + length(small block3) + // Bx : length of small data block bx. + // sizeof(Bx) = 4 bytes. + // Bx = length(compress(small blockx)) + try { + while (remain_input_size > 0) { + if (remain_input_size < 4) { + return Status::InvalidArgument( + "Need more input buffer to get large_block_uncompressed_len."); + } + + uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); + input_ptr += 4; + remain_input_size -= 4; + + if (remain_output_size < large_block_uncompressed_len) { + return Status::InvalidArgument( + "Need more output buffer to get uncompressed data."); + } + + while (large_block_uncompressed_len > 0) { + if (remain_input_size < 4) { + return Status::InvalidArgument( + "Need more input buffer to get small_block_compressed_len."); + } + + uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); + input_ptr += 4; + remain_input_size -= 4; + + if (remain_input_size < small_block_compressed_len) { + return Status::InvalidArgument( + "Need more input buffer to decompress small block."); + } + + auto small_block_uncompressed_len = + orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, + output_ptr, output_limit); + + input_ptr += small_block_compressed_len; + remain_input_size -= small_block_compressed_len; + + output_ptr += small_block_uncompressed_len; + large_block_uncompressed_len -= small_block_uncompressed_len; + remain_output_size -= small_block_uncompressed_len; + } + } + } catch (const orc::ParseError& e) { + //Prevent be from hanging due to orc::lzoDecompress throw exception + return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); + } + return Status::OK(); + } +}; + Status get_block_compression_codec(segment_v2::CompressionTypePB type, BlockCompressionCodec** codec) { switch (type) { @@ -1127,6 +1225,9 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code *codec = GzipBlockCompression::instance(); #endif break; + case tparquet::CompressionCodec::LZO: + *codec = LzoBlockCompression::instance(); + break; default: return Status::InternalError("unknown compression type({})", parquet_codec); }