diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index d7e81cec15dfae..f7920def38e0b6 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -104,7 +104,7 @@ Status ColumnReader::init() { strings::Substitute("unsupported typeinfo, type=$0", _meta.type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec)); for (int i = 0; i < _meta.indexes_size(); i++) { auto& index_meta = _meta.indexes(i); @@ -149,7 +149,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag PageReadOptions opts; opts.rblock = iter_opts.rblock; opts.page_pointer = pp; - opts.codec = _compress_codec.get(); + opts.codec = _compress_codec; opts.stats = iter_opts.stats; opts.verify_checksum = _opts.verify_checksum; opts.use_page_cache = iter_opts.use_page_cache; diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index e5e07a0871331d..49a3d96af7e88b 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -175,7 +175,7 @@ class ColumnReader { TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses. const EncodingInfo* _encoding_info = nullptr; // initialized in init(), used for create PageDecoder - std::unique_ptr _compress_codec; // initialized in init() + const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init() // meta for various column indexes (null if the index is absent) const ZoneMapIndexPB* _zone_map_index_meta = nullptr; diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 9a54b210c86fce..5cb5140102af42 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() { } Status ScalarColumnWriter::init() { - RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec)); PageBuilder* page_builder = nullptr; @@ -420,7 +420,7 @@ Status ScalarColumnWriter::write_data() { footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING); PagePointer dict_pp; - RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(), + RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec, _opts.compression_min_space_saving, _wblock, {dict_body.slice()}, footer, &dict_pp)); dict_pp.to_proto(_opts.meta->mutable_dict_page()); @@ -508,8 +508,8 @@ Status ScalarColumnWriter::finish_current_page() { } // trying to compress page body OwnedSlice compressed_body; - RETURN_IF_ERROR(PageIO::compress_page_body( - _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body)); + RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, + body, &compressed_body)); if (compressed_body.slice().empty()) { // page body is uncompressed page->data.emplace_back(std::move(encoded_values)); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 2f50ebf075744f..0cacbf8547d2f5 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -249,7 +249,7 @@ class ScalarColumnWriter final : public ColumnWriter { PageHead _pages; ordinal_t _first_rowid = 0; - std::unique_ptr _compress_codec; + const BlockCompressionCodec* _compress_codec = nullptr; std::unique_ptr _ordinal_index_builder; std::unique_ptr _zone_map_index_builder; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 955c3b96e3720b..4ebe0d3af34896 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec)); _value_key_coder = get_key_coder(_type_info->type()); std::unique_ptr rblock; @@ -83,7 +83,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint PageReadOptions opts; opts.rblock = rblock; opts.page_pointer = pp; - opts.codec = _compress_codec.get(); + opts.codec = _compress_codec; OlapReaderStatistics tmp_stats; opts.stats = &tmp_stats; opts.use_page_cache = _use_page_cache; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 6663ac10776911..439c790e6021d7 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -84,7 +84,7 @@ class IndexedColumnReader { const TypeInfo* _type_info = nullptr; const EncodingInfo* _encoding_info = nullptr; - std::unique_ptr _compress_codec; + const BlockCompressionCodec* _compress_codec = nullptr; const KeyCoder* _value_key_coder = nullptr; }; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index 4c7259c90c6d44..d195b9f2c4376a 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -72,7 +72,7 @@ Status IndexedColumnWriter::init() { } if (_options.compression != NO_COMPRESSION) { - RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec)); } return Status::OK(); } @@ -110,7 +110,7 @@ Status IndexedColumnWriter::_finish_current_data_page() { footer.mutable_data_page_footer()->set_num_values(num_values_in_page); footer.mutable_data_page_footer()->set_nullmap_size(0); - RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(), + RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec, _options.compression_min_space_saving, _wblock, {page_body.slice()}, footer, &_last_data_page)); _num_data_pages++; @@ -159,7 +159,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM PagePointer pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec.get(), _options.compression_min_space_saving, _wblock, + _compress_codec, _options.compression_min_space_saving, _wblock, {page_body.slice()}, page_footer, &pp)); meta->set_is_root_data_page(false); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h index 285ba890b23d2e..fc222388c6af73 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h @@ -108,7 +108,7 @@ class IndexedColumnWriter { std::unique_ptr _value_index_builder; // encoder for value index's key const KeyCoder* _value_key_coder; - std::unique_ptr _compress_codec; + const BlockCompressionCodec* _compress_codec; DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter); }; diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index a1ee74f047d123..a10c400550edbf 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -26,6 +26,7 @@ #include #include "gutil/strings/substitute.h" +#include "runtime/threadlocal.h" #include "util/faststring.h" namespace doris { @@ -85,49 +86,103 @@ class Lz4BlockCompression : public BlockCompressionCodec { } }; -// Used for LZ4 frame format, decompress speed is two times faster than LZ4. -class Lz4fBlockCompression : public BlockCompressionCodec { +class Lz4fCompressionContext { public: - Status init() override { - auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION); - if (LZ4F_isError(ret1)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1))); + inline Status init(bool is_compress) { + if (is_compress && !ctx_c_inited) { + auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION); + if (LZ4F_isError(ret1)) { + return Status::InvalidArgument(strings::Substitute( + "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1))); + } + ctx_c_inited = true; } - ctx_c_inited = true; - auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION); - if (LZ4F_isError(ret2)) { - return Status::InvalidArgument(strings::Substitute( - "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2))); + if (!is_compress && !ctx_d_inited) { + auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION); + if (LZ4F_isError(ret2)) { + return Status::InvalidArgument( + strings::Substitute("Fail to LZ4F_createDecompressionContext, msg=$0", + LZ4F_getErrorName(ret2))); + } + ctx_d_inited = true; } - ctx_d_inited = true; + // reset decompression context to avoid ERROR_maxBlockSize_invalid + if (!is_compress) LZ4F_resetDecompressionContext(ctx_d); return Status::OK(); } - ~Lz4fBlockCompression() override { + ~Lz4fCompressionContext() { if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c); if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d); } + inline LZ4F_compressionContext_t get_compress_ctx() { return ctx_c; } + inline LZ4F_decompressionContext_t get_decompress_ctx() { return ctx_d; } + +private: + LZ4F_compressionContext_t ctx_c; + bool ctx_c_inited = false; + LZ4F_decompressionContext_t ctx_d; + bool ctx_d_inited = false; +}; + +// a workaroud for no-trival thread_local requiring GLIBC_2.18 +// refer to https://github.com/apache/incubator-doris/pull/7911 +class Lz4fCompressionContextPtr { +public: + Lz4fCompressionContextPtr(); + Lz4fCompressionContext* get(); + +private: + DECLARE_STATIC_THREAD_LOCAL(Lz4fCompressionContext, thread_local_lz4f_ctx); +}; + +DEFINE_STATIC_THREAD_LOCAL(Lz4fCompressionContext, Lz4fCompressionContextPtr, + thread_local_lz4f_ctx); + +Lz4fCompressionContextPtr::Lz4fCompressionContextPtr() { + INIT_STATIC_THREAD_LOCAL(Lz4fCompressionContext, thread_local_lz4f_ctx); +} + +Lz4fCompressionContext* Lz4fCompressionContextPtr::get() { + return thread_local_lz4f_ctx; +} + +// thread_local context for lz4f compress/decompress +// one lz4f_ctx per each thread for performance and thread safe +inline thread_local Lz4fCompressionContextPtr lz4f_ctx; + +// Used for LZ4 frame format, decompress speed is two times faster than LZ4. +class Lz4fBlockCompression : public BlockCompressionCodec { +public: + static const Lz4fBlockCompression* instance() { + static Lz4fBlockCompression s_instance; + return &s_instance; + } + + ~Lz4fBlockCompression() override {} + Status compress(const Slice& input, Slice* output) const override { std::vector inputs {input}; return compress(inputs, output); } Status compress(const std::vector& inputs, Slice* output) const override { - if (!ctx_c_inited) - return Status::InvalidArgument("LZ4F_createCompressionContext not sucess"); - - return _compress(ctx_c, inputs, output); + auto pctx = lz4f_ctx.get(); + if (pctx && pctx->init(true).ok()) + return _compress(pctx->get_compress_ctx(), inputs, output); + else + return Status::InvalidArgument("Fail to get thread local lz4f_ctx"); } Status decompress(const Slice& input, Slice* output) const override { - if (!ctx_d_inited) - return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess"); - - return _decompress(ctx_d, input, output); + auto pctx = lz4f_ctx.get(); + if (pctx && pctx->init(false).ok()) + return _decompress(pctx->get_decompress_ctx(), input, output); + else + return Status::InvalidArgument("Fail to get thread local lz4f_ctx"); } size_t max_compressed_len(size_t len) const override { @@ -167,8 +222,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { } Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const { - // reset decompression context to avoid ERROR_maxBlockSize_invalid - LZ4F_resetDecompressionContext(ctx); size_t input_size = input.size; auto lres = LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr); @@ -189,10 +242,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { private: static LZ4F_preferences_t _s_preferences; - LZ4F_compressionContext_t ctx_c; - bool ctx_c_inited = false; - LZ4F_decompressionContext_t ctx_d; - bool ctx_d_inited = false; }; LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { @@ -376,38 +425,27 @@ class ZlibBlockCompression : public BlockCompressionCodec { }; Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec) { - BlockCompressionCodec* ptr = nullptr; + const BlockCompressionCodec** codec) { switch (type) { case segment_v2::CompressionTypePB::NO_COMPRESSION: - codec.reset(nullptr); - return Status::OK(); + *codec = nullptr; + break; case segment_v2::CompressionTypePB::SNAPPY: - ptr = new SnappyBlockCompression(); + *codec = SnappyBlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4: - ptr = new Lz4BlockCompression(); + *codec = Lz4BlockCompression::instance(); break; case segment_v2::CompressionTypePB::LZ4F: - ptr = new Lz4fBlockCompression(); + *codec = Lz4fBlockCompression::instance(); break; case segment_v2::CompressionTypePB::ZLIB: - ptr = new ZlibBlockCompression(); + *codec = ZlibBlockCompression::instance(); break; default: return Status::NotFound(strings::Substitute("unknown compression type($0)", type)); } - - if (!ptr) return Status::NotFound("Failed to create compression codec"); - - Status st = ptr->init(); - if (st.ok()) { - codec.reset(ptr); - } else { - delete ptr; - } - - return st; + return Status::OK(); } } // namespace doris diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index 7ad3f9ecb79ed4..ff251137937e01 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -34,8 +34,6 @@ class BlockCompressionCodec { public: virtual ~BlockCompressionCodec() {} - virtual Status init() { return Status::OK(); } - // This function will compress input data into output. // output should be preallocated, and its capacity must be large enough // for compressed input, which can be get through max_compressed_len function. @@ -63,6 +61,6 @@ class BlockCompressionCodec { // // Return not OK, if error happens. Status get_block_compression_codec(segment_v2::CompressionTypePB type, - std::unique_ptr& codec); + const BlockCompressionCodec** codec); } // namespace doris diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index 0bf62e6d9d9a0b..cec2ac48dac775 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -42,8 +42,8 @@ static std::string generate_str(size_t len) { } void test_single_slice(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + const BlockCompressionCodec* codec = nullptr; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; @@ -104,8 +104,8 @@ TEST_F(BlockCompressionTest, single) { } void test_multi_slices(segment_v2::CompressionTypePB type) { - std::unique_ptr codec; - auto st = get_block_compression_codec(type, codec); + const BlockCompressionCodec* codec = nullptr; + auto st = get_block_compression_codec(type, &codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000};