Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,8 @@ DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

// clang-format off
#ifdef BE_TEST
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,9 @@ DECLARE_mBool(ignore_not_found_file_in_external_table);
DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
// To be compatible with hadoop's block compression
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

#ifdef BE_TEST
// test s3
Expand Down
158 changes: 91 additions & 67 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,90 +535,114 @@ Status SnappyBlockDecompressor::init() {
return Status::OK();
}

// Hadoop snappycodec source :
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
// Example:
// OriginData(The original data will be divided into several large data block.) :
// large data block1 | large data block2 | large data block3 | ....
// The large data block will be divided into several small data block.
// Suppose a large data block is divided into three small blocks:
// large data block1: | small block1 | small block2 | small block3 |
// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
//
// A : original length of the current block of large data block.
// sizeof(A) = 4 bytes.
// A = length(small block1) + length(small block2) + length(small block3)
// Bx : length of small data block bx.
// sizeof(Bx) = 4 bytes.
// Bx = length(compress(small blockx))
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len,
bool* stream_end, size_t* more_input_bytes,
size_t* more_output_bytes) {
uint8_t* src = input;
size_t remaining_input_size = input_len;
int64_t uncompressed_total_len = 0;
*input_bytes_read = 0;
auto* input_ptr = input;
auto* output_ptr = output;

// The hadoop snappy codec is as:
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
// ....
// <4 byte big endian uncompressed size>
// <4 byte big endian compressed size>
// <snappy compressed block>
//
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
if (remaining_output_len < uncompressed_block_len) {

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

std::size_t remaining_output_len = output_max_len - *decompressed_len;

if (remaining_output_len < remaining_decompressed_large_block_len) {
// Need more output buffer
*more_output_bytes = uncompressed_block_len - remaining_output_len;
break;
}
*more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}
std::size_t decompressed_large_block_len = 0;
do {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

// Read compressed size
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}
// Read the length of the next snappy compressed block.
size_t compressed_small_block_len = BigEndian::Load32(input_ptr);

src += 2 * sizeof(uint32_t);
remaining_input_size -= 2 * sizeof(uint32_t);

// ATTN: the uncompressed len from GetUncompressedLength() is same as
// uncompressed_block_len, so I think it is unnecessary to get it again.
// Get uncompressed len from snappy
// size_t uncompressed_len;
// if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(src),
// compressed_len, &uncompressed_len)) {
// return Status::InternalError("snappy block decompress failed to get uncompressed len");
// }

// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
input_ptr += sizeof(uint32_t);
input_len -= sizeof(uint32_t);

if (compressed_small_block_len == 0) {
continue;
}

if (compressed_small_block_len > input_len) {
// Need more input buffer
*more_input_bytes = compressed_small_block_len - input_len;
break;
}

// Decompress this block.
size_t decompressed_small_block_len;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
&decompressed_small_block_len)) {
return Status::InternalError(
"snappy block decompress failed to get uncompressed len");
}
if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
compressed_small_block_len,
reinterpret_cast<char*>(output_ptr))) {
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
decompressed_small_block_len, compressed_small_block_len);
}
input_ptr += compressed_small_block_len;
input_len -= compressed_small_block_len;

output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);

if (*more_input_bytes != 0) {
// Need more input buffer
input_ptr = large_block_input_ptr;
output_ptr = large_block_output_ptr;
break;
}

output += uncompressed_block_len;
src += compressed_len;
remaining_input_size -= compressed_len;
uncompressed_total_len += uncompressed_block_len;
*decompressed_len += decompressed_large_block_len;
}

*input_bytes_read += (input_len - remaining_input_size);
*decompressed_len = uncompressed_total_len;
*input_bytes_read += (input_ptr - input);
// If no more input and output need, means this is the end of a compressed block
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);

Expand Down
Loading