From 150be823cb8a9d859bb0ef21ae56bd96b9a7d83c Mon Sep 17 00:00:00 2001 From: yangzhg Date: Wed, 25 May 2022 16:23:11 +0800 Subject: [PATCH] [fix] disabel transfer data large than 2GB because of brpc and protobuf cannot transfer data large than 2GB, if large than 2GB will overflow, so add a check before send --- be/src/runtime/row_batch.cpp | 11 ++--------- be/src/vec/core/block.cpp | 35 ++++++++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 520a5642f92f91..5acf7634defe09 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -273,10 +273,6 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, try { // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails _compression_scratch.resize(max_compressed_size); - } catch (const std::bad_alloc& e) { - can_compress = false; - LOG(WARNING) << "Try to alloc " << max_compressed_size - << " bytes for compression scratch failed. " << e.what(); } catch (...) { can_compress = false; std::exception_ptr p = std::current_exception(); @@ -309,11 +305,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, *compressed_size = pb_size; if (pb_size > std::numeric_limits::max()) { // the protobuf has a hard limit of 2GB for serialized data. - return Status::InternalError( - fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. " - "please set BE config 'transfer_data_by_brpc_attachment' to true " - "and restart BE.", - pb_size)); + return Status::InternalError(fmt::format( + "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size)); } } else { *uncompressed_size = pb_size + tuple_byte_size; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index aa482fcfbf7089..4866a1129e7e6f 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -669,7 +669,16 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - allocated_buf->resize(content_uncompressed_size); + try { + allocated_buf->resize(content_uncompressed_size); + } catch (...) { + std::exception_ptr p = std::current_exception(); + std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}", + content_uncompressed_size, + p ? p.__cxa_exception_type()->name() : "null"); + LOG(WARNING) << msg; + return Status::BufferAllocFailed(msg); + } char* buf = allocated_buf->data(); for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf); @@ -678,12 +687,21 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp // compress if (config::compress_rowbatches && content_uncompressed_size > 0) { - // Try compressing the content to compression_scratch, - // swap if compressed data is smaller + size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size); std::string compression_scratch; - uint32_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size); - compression_scratch.resize(max_compressed_size); - + try { + // Try compressing the content to compression_scratch, + // swap if compressed data is smaller + // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails + compression_scratch.resize(max_compressed_size); + } catch (...) { + std::exception_ptr p = std::current_exception(); + std::string msg = + fmt::format("Try to alloc {} bytes for compression scratch failed. reason {}", + max_compressed_size, p ? p.__cxa_exception_type()->name() : "null"); + LOG(WARNING) << msg; + return Status::BufferAllocFailed(msg); + } size_t compressed_size = 0; char* compressed_output = compression_scratch.data(); snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output, @@ -701,7 +719,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size; } - + if (*compressed_bytes >= std::numeric_limits::max()) { + return Status::InternalError(fmt::format( + "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes)); + } return Status::OK(); }