Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,26 @@

#include "common/config.h"
#include "exec/decompressor.h"
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
#include "orc/OrcFile.hh"
#include "util/bit_util.h"
#include "util/defer_op.h"
#include "util/faststring.h"

namespace orc {
/**
* Decompress the bytes in to the output buffer.
* @param inputAddress the start of the input
* @param inputLimit one past the last byte of the input
* @param outputAddress the start of the output buffer
* @param outputLimit one past the last byte of the output buffer
* @result the number of bytes decompressed
*/
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
char* outputLimit);
} // namespace orc

namespace doris {

using strings::Substitute;
Expand Down Expand Up @@ -1071,6 +1086,89 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
};
#endif

class LzoBlockCompression final : public BlockCompressionCodec {
public:
static LzoBlockCompression* instance() {
static LzoBlockCompression s_instance;
return &s_instance;
}

Status compress(const Slice& input, faststring* output) override {
return Status::InvalidArgument("not impl lzo compress.");
}
size_t max_compressed_len(size_t len) override { return 0; };
Status decompress(const Slice& input, Slice* output) override {
auto* input_ptr = input.data;
auto remain_input_size = input.size;
auto* output_ptr = output->data;
auto remain_output_size = output->size;
auto* output_limit = output->data + output->size;

// Example:
// OriginData(The original data will be divided into several large data block.) :
// large data block1 | large data block2 | large data block3 | ....
// The large data block will be divided into several small data block.
// Suppose a large data block is divided into three small blocks:
// large data block1: | small block1 | small block2 | small block3 |
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data block.
// sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx.
// sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))
try {
while (remain_input_size > 0) {
if (remain_input_size < 4) {
return Status::InvalidArgument(
"Need more input buffer to get large_block_uncompressed_len.");
}

uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
input_ptr += 4;
remain_input_size -= 4;

if (remain_output_size < large_block_uncompressed_len) {
return Status::InvalidArgument(
"Need more output buffer to get uncompressed data.");
}

while (large_block_uncompressed_len > 0) {
if (remain_input_size < 4) {
return Status::InvalidArgument(
"Need more input buffer to get small_block_compressed_len.");
}

uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
input_ptr += 4;
remain_input_size -= 4;

if (remain_input_size < small_block_compressed_len) {
return Status::InvalidArgument(
"Need more input buffer to decompress small block.");
}

auto small_block_uncompressed_len =
orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
output_ptr, output_limit);

input_ptr += small_block_compressed_len;
remain_input_size -= small_block_compressed_len;

output_ptr += small_block_uncompressed_len;
large_block_uncompressed_len -= small_block_uncompressed_len;
remain_output_size -= small_block_uncompressed_len;
}
}
} catch (const orc::ParseError& e) {
//Prevent be from hanging due to orc::lzoDecompress throw exception
return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
}
return Status::OK();
}
};

Status get_block_compression_codec(segment_v2::CompressionTypePB type,
BlockCompressionCodec** codec) {
switch (type) {
Expand Down Expand Up @@ -1127,6 +1225,9 @@ Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_code
*codec = GzipBlockCompression::instance();
#endif
break;
case tparquet::CompressionCodec::LZO:
*codec = LzoBlockCompression::instance();
break;
default:
return Status::InternalError("unknown compression type({})", parquet_codec);
}
Expand Down