Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ class RuntimeState {
return _query_options.enable_enable_exchange_node_parallel_merge;
}

segment_v2::CompressionTypePB fragement_transmission_compression_type() {
if (_query_options.__isset.fragment_transmission_compression_codec) {
if (_query_options.fragment_transmission_compression_codec == "lz4") {
return segment_v2::CompressionTypePB::LZ4;
}
}
return segment_v2::CompressionTypePB::SNAPPY;
}

// the following getters are only valid after Prepare()
InitialReservations* initial_reservations() const { return _initial_reservations; }

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/aggregate_functions/aggregate_function_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ struct AggregateFunctionSortData {
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes);
block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes,
segment_v2::CompressionTypePB::SNAPPY);

write_string_binary(pblock.SerializeAsString(), buf);
}
Expand Down
45 changes: 33 additions & 12 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "udf/udf.h"
#include "util/block_compression.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -79,16 +80,28 @@ Block::Block(const PBlock& pblock) {
std::string compression_scratch;
if (pblock.compressed()) {
// Decompress
SCOPED_RAW_TIMER(&_decompress_time_ns);
const char* compressed_data = pblock.column_values().c_str();
size_t compressed_size = pblock.column_values().size();
size_t uncompressed_size = 0;
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
compression_scratch.resize(uncompressed_size);
success =
snappy::RawUncompress(compressed_data, compressed_size, compression_scratch.data());
DCHECK(success) << "snappy::RawUncompress failed";
if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
std::unique_ptr<BlockCompressionCodec> codec;
get_block_compression_codec(pblock.compression_type(), codec);
uncompressed_size = pblock.uncompressed_size();
compression_scratch.resize(uncompressed_size);
Slice decompressed_slice(compression_scratch);
codec->decompress(Slice(compressed_data, compressed_size), &decompressed_slice);
DCHECK(uncompressed_size == decompressed_slice.size);
} else {
bool success = snappy::GetUncompressedLength(compressed_data, compressed_size,
&uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
compression_scratch.resize(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size,
compression_scratch.data());
DCHECK(success) << "snappy::RawUncompress failed";
}
_decompressed_bytes = uncompressed_size;
buf = compression_scratch.data();
} else {
buf = pblock.column_values().data();
Expand Down Expand Up @@ -684,6 +697,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
}

Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
// calc uncompressed size for allocation
size_t content_uncompressed_size = 0;
Expand Down Expand Up @@ -717,7 +731,14 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp

// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size);
SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
pblock->set_compression_type(compression_type);
pblock->set_uncompressed_size(content_uncompressed_size);

std::unique_ptr<BlockCompressionCodec> codec;
RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));

size_t max_compressed_size = codec->max_compressed_len(content_uncompressed_size);
std::string compression_scratch;
try {
// Try compressing the content to compression_scratch,
Expand All @@ -732,10 +753,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
size_t compressed_size = 0;
char* compressed_output = compression_scratch.data();
snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output,
&compressed_size);

Slice compressed_slice(compression_scratch);
codec->compress(Slice(column_values->data(), content_uncompressed_size), &compressed_slice);
size_t compressed_size = compressed_slice.size;

if (LIKELY(compressed_size < content_uncompressed_size)) {
compression_scratch.resize(compressed_size);
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class Block {
Container data;
IndexByName index_by_name;

int64_t _decompress_time_ns = 0;
int64_t _decompressed_bytes = 0;

int64_t _compress_time_ns = 0;

public:
BlockInfo info;

Expand Down Expand Up @@ -262,6 +267,7 @@ class Block {

// serialize block to PBlock
Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;

// serialize block to PRowbatch
Expand Down Expand Up @@ -335,6 +341,10 @@ class Block {

void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx);

int64_t get_decompress_time() const { return _decompress_time_ns; }
int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
int64_t get_compress_time() const { return _compress_time_ns; }

private:
void erase_impl(size_t position);
void initialize_index_by_name();
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
{
SCOPED_TIMER(_recvr->_deserialize_row_batch_timer);
block = new Block(pblock);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
}

VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n";
Expand Down Expand Up @@ -284,6 +286,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
_buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
}

VDataStreamRecvr::~VDataStreamRecvr() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class VDataStreamRecvr {
RuntimeProfile::Counter* _first_batch_wait_total_timer;
RuntimeProfile::Counter* _buffer_full_total_timer;
RuntimeProfile::Counter* _data_arrival_timer;
RuntimeProfile::Counter* _decompress_timer;
RuntimeProfile::Counter* _decompress_bytes;

std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
};
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
Expand All @@ -340,6 +341,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
Expand Down Expand Up @@ -418,6 +420,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
_ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
_compress_timer = ADD_TIMER(profile(), "CompressTime");
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
Expand All @@ -438,6 +441,8 @@ Status VDataStreamSender::open(RuntimeState* state) {
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->open(state));
}

_compression_type = state->fragement_transmission_compression_type();
return Status::OK();
}

Expand Down Expand Up @@ -590,9 +595,10 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
_transfer_large_data_by_brpc));
_compression_type, _transfer_large_data_by_brpc));
COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
COUNTER_UPDATE(_compress_timer, src->get_compress_time());
}

return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class VDataStreamSender : public DataSink {

RuntimeProfile* _profile; // Allocated from _pool
RuntimeProfile::Counter* _serialize_batch_timer;
RuntimeProfile::Counter* _compress_timer;
RuntimeProfile::Counter* _bytes_sent_counter;
RuntimeProfile::Counter* _uncompressed_bytes_counter;
RuntimeProfile::Counter* _ignore_rows;
Expand All @@ -146,6 +147,8 @@ class VDataStreamSender : public DataSink {

// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;

segment_v2::CompressionTypePB _compression_type;
};

// TODO: support local exechange
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes,
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
Expand Down
46 changes: 27 additions & 19 deletions be/test/vec/core/block_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ TEST(BlockTest, RowBatchCovertToBlock) {
}
}

void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
void block_to_pb(
const vectorized::Block& block, PBlock* pblock,
segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes);
Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, compression_type);
EXPECT_TRUE(st.ok());
EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
EXPECT_EQ(compressed_bytes, pblock->column_values().size());
Expand Down Expand Up @@ -237,7 +239,7 @@ void fill_block_with_array_string(vectorized::Block& block) {
block.insert(test_array_string);
}

TEST(BlockTest, SerializeAndDeserializeBlock) {
void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_type) {
config::compress_rowbatches = true;
// int
{
Expand All @@ -250,12 +252,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -271,12 +273,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_string");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -295,12 +297,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
decimal_data_type, "test_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -321,12 +323,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_bitmap");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -341,12 +343,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_data_type, "test_nullable");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -361,14 +363,14 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_column->get_ptr(), nullable_data_type, "test_nullable_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
EXPECT_EQ(1, pblock.column_metas_size());
EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -385,12 +387,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
data_type, "test_nullable_int32");
vectorized::Block block({type_and_name});
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
Expand All @@ -400,17 +402,23 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
fill_block_with_array_int(block);
fill_block_with_array_string(block);
PBlock pblock;
block_to_pb(block, &pblock);
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2);
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
}

TEST(BlockTest, SerializeAndDeserializeBlock) {
config::compress_rowbatches = true;
serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
}

TEST(BlockTest, dump_data) {
auto vec = vectorized::ColumnVector<Int32>::create();
auto& int32_data = vec->get_data();
Expand Down
Loading