From 8da27b16ec5583fa429cc0d709606bac00a3f1bc Mon Sep 17 00:00:00 2001 From: airborne12 Date: Mon, 4 Nov 2024 17:31:51 +0800 Subject: [PATCH] [Fix](segment compaction) fix error using of inverted index file writer in segment compaction (#43114) ### What problem does this PR solve? Related PR: #41625 Problem Summary: Fix BUAF problem, stack like this ``` stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421 1# 0x00007F5370ABD520 in /lib/x86_64-linux-gnu/libc.so.6 2# pthread_kill at ./nptl/pthread_kill.c:89 3# raise at ../sysdeps/posix/raise.c:27 4# abort at ./stdlib/abort.c:81 5# _gnu_cxx::_verbose_terminate_handler() [clone .cold] at ../../../../libstdc+-v3/libsupc+/vterminate.cc:75 6# _cxxabiv1::_terminate(void ()) at ../../../../libstdc+-v3/libsupc+/eh_terminate.cc:48 7# 0x000055D9E015B681 in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 8# 0x000055D9E015B7D4 in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 9# 0x000055D9E015BBC6 in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 10# void fmt::v7::detail::buffer::append(char const*, char const*) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 11# char const* fmt::v7::detail::parse_replacement_field, char, fmt::v7::basic_format_context, char> >&>(char const*, char const*, fmt::v7::detail::format_handler, char, fmt::v7::basic_format_context, char> >&) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 12# void fmt::v7::detail::vformat_to(fmt::v7::detail::buffer&, fmt::v7::basic_string_view, fmt::v7::basic_format_args::type>, fmt::v7::type_identity::type> >, fmt::v7::detail::locale_ref) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 13# fmt::v7::detail::vformat[abi:cxx11](fmt::v7::basic_string_view, fmt::v7::format_args) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 14# doris::segment_v2::InvertedIndexDescriptor::get_temporary_index_path[abi:cxx11](std::basic_string_view >, std::basic_string_view >, long, long, std::basic_string_view >) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/inverted_index_desc.cpp:35 15# doris::segment_v2::InvertedIndexFileWriter::open(doris::TabletIndex const*) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp:45 16# doris::segment_v2::InvertedIndexColumnWriterImpl<(doris::FieldType)7>::init_bkd_index() at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp:146 17# doris::segment_v2::InvertedIndexColumnWriterImpl<(doris::FieldType)7>::init() at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp:116 18# doris::segment_v2::InvertedIndexColumnWriter::create(doris::Field const*, std::unique_ptr >, doris::segment_v2::InvertedIndexFileWriter, doris::TabletIndex const*) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp:698 19# doris::segment_v2::ScalarColumnWriter::init() at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/column_writer.cpp:483 20# doris::segment_v2::SegmentWriter::_create_column_writer(unsigned int, doris::TabletColumn const&, std::shared_ptr const&) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/segment_writer.cpp:258 21# doris::segment_v2::SegmentWriter::_create_writers(std::shared_ptr const&, std::vector > const&) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/segment_writer.cpp:307 22# doris::segment_v2::SegmentWriter::init(std::vector > const&, bool) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segment_v2/segment_writer.cpp:276 23# doris::SegcompactionWorker::_do_compact_segments(std::shared_ptr, std::allocator > > >) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 24# doris::SegcompactionWorker::compact_segments(std::shared_ptr, std::allocator > > >) at /home/zcp/repo_center/doris_master/doris/be/src/olap/rowset/segcompaction.cpp:354 25# doris::StorageEngine::_handle_seg_compaction(std::shared_ptr, std::shared_ptr, std::allocator > > >, unsigned long) at /home/zcp/repo_center/doris_master/doris/be/src/olap/olap_server.cpp:1121 26# std::_Function_handler, std::shared_ptr, std::allocator > > >)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291 27# doris::ThreadPool::dispatch_thread() in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 28# doris::Thread::supervise_thread(void*) at /home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:499 29# start_thread at ./nptl/pthread_create.c:442 30# 0x00007F5370BA1850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83 172.20.50.120 last coredump sql: last SQL query not found ``` ### Check List (For Committer) - Test - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No colde files have been changed. - [ ] Other reason - Behavior changed: - [x] No. - [ ] Yes. - Does this need documentation? - [x] No. - [ ] Yes. - Release note None ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/olap/rowset/beta_rowset_writer.cpp | 7 ++- be/src/olap/rowset/beta_rowset_writer.h | 4 +- be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/rowset/segcompaction.h | 5 ++ be/test/olap/segcompaction_test.cpp | 64 +++++++++++++++++++++++ 5 files changed, 78 insertions(+), 4 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index a19989af1e7f85..634e8b6442969f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -948,7 +948,7 @@ Status BaseBetaRowsetWriter::create_inverted_index_file_writer( return Status::OK(); } -Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( +Status BetaRowsetWriter::create_segment_writer_for_segcompaction( std::unique_ptr* writer, int64_t begin, int64_t end) { DCHECK(begin >= 0 && end >= 0); std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, @@ -988,6 +988,11 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); } _segcompaction_worker->get_file_writer().reset(file_writer.release()); + if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); + idx_file_writer != nullptr) { + RETURN_IF_ERROR(idx_file_writer->close()); + } + _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 6063f7141771e4..ca2685f5956eae 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -280,6 +280,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { Status flush_segment_writer_for_segcompaction( std::unique_ptr* writer, uint64_t index_size, KeyBoundsPB& key_bounds); + Status create_segment_writer_for_segcompaction( + std::unique_ptr* writer, int64_t begin, int64_t end); bool is_segcompacted() const { return _num_segcompacted > 0; } @@ -290,8 +292,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { Status _check_segment_number_limit(size_t segnum) override; int64_t _num_seg() const override; Status _wait_flying_segcompaction(); - Status _create_segment_writer_for_segcompaction( - std::unique_ptr* writer, int64_t begin, int64_t end); Status _segcompaction_if_necessary(); Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index fc8baf952c1863..92b903d3a90790 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -219,7 +219,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat Status SegcompactionWorker::_create_segment_writer_for_segcompaction( std::unique_ptr* writer, uint32_t begin, uint32_t end) { - return _writer->_create_segment_writer_for_segcompaction(writer, begin, end); + return _writer->create_segment_writer_for_segcompaction(writer, begin, end); } Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 67dd6889aadd72..f0f8aa6b25787c 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -25,6 +25,7 @@ #include "olap/merger.h" #include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" +#include "segment_v2/inverted_index_file_writer.h" #include "segment_v2/segment.h" namespace doris { @@ -61,6 +62,9 @@ class SegcompactionWorker { DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; } io::FileWriterPtr& get_file_writer() { return _file_writer; } + InvertedIndexFileWriterPtr& get_inverted_index_file_writer() { + return _inverted_index_file_writer; + } // set the cancel flag, tasks already started will not be cancelled. bool cancel(); @@ -86,6 +90,7 @@ class SegcompactionWorker { // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; + InvertedIndexFileWriterPtr _inverted_index_file_writer = nullptr; // for unique key mow table std::unique_ptr _rowid_conversion; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index ba0d23acb02cef..32d724d246b3b5 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -34,6 +34,7 @@ #include "olap/rowset/rowset_reader_context.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -178,6 +179,24 @@ class SegCompactionTest : public testing::Test { tablet_schema->init_from_pb(tablet_schema_pb); } + void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id, + const std::string& index_name, int32_t col_unique_id, + const std::string& column_type, const std::string& column_name, + bool parser = false) { + column_pb->set_unique_id(col_unique_id); + column_pb->set_name(column_name); + column_pb->set_type(column_type); + column_pb->set_is_key(false); + column_pb->set_is_nullable(true); + tablet_index->set_index_id(index_id); + tablet_index->set_index_name(index_name); + tablet_index->set_index_type(IndexType::INVERTED); + tablet_index->add_col_unique_id(col_unique_id); + if (parser) { + auto* properties = tablet_index->mutable_properties(); + (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; + } + } // use different id to avoid conflict void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, RowsetWriterContext* rowset_writer_context) { @@ -830,6 +849,51 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { } } +TEST_F(SegCompactionTest, CreateSegCompactionWriter) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB schema_pb; + schema_pb.set_keys_type(KeysType::DUP_KEYS); + schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); + + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0, "INT", + "key"); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1, "STRING", + "v1"); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, "v2_index", 2, "STRING", + "v2", true); + construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, "v3_index", 3, "INT", + "v3"); + + tablet_schema.reset(new TabletSchema); + tablet_schema->init_from_pb(schema_pb); + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 3; + std::vector segment_num_rows; + { + RowsetWriterContext writer_context; + create_rowset_writer_context(10052, tablet_schema, &writer_context); + + auto res = RowsetFactory::create_rowset_writer(*l_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + auto beta_rowset_writer = dynamic_cast(rowset_writer.get()); + EXPECT_TRUE(beta_rowset_writer != nullptr); + std::unique_ptr writer = nullptr; + auto status = beta_rowset_writer->create_segment_writer_for_segcompaction(&writer, 0, 1); + EXPECT_TRUE(beta_rowset_writer != nullptr); + EXPECT_TRUE(status == Status::OK()); + int64_t inverted_index_file_size = 0; + status = writer->close_inverted_index(&inverted_index_file_size); + EXPECT_TRUE(status == Status::OK()); + std::cout << inverted_index_file_size << std::endl; + } +} + TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { config::enable_segcompaction = true; Status s;