From ff152eabfbb029b14ced24c4ee93f6a94eae3902 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 13 Apr 2022 16:49:48 +0800 Subject: [PATCH 1/3] move rowset writer to a single place --- be/src/olap/base_tablet.h | 10 --- be/src/olap/compaction.cpp | 22 +----- be/src/olap/data_dir.cpp | 6 +- be/src/olap/delta_writer.cpp | 25 +----- be/src/olap/push_handler.cpp | 47 +----------- be/src/olap/schema_change.cpp | 73 ++++-------------- be/src/olap/storage_migration_v2.cpp | 2 +- be/src/olap/tablet.cpp | 106 ++++++++++++++++++++++--- be/src/olap/tablet.h | 14 +++- be/src/olap/tablet_manager.cpp | 111 ++++----------------------- be/src/olap/tablet_manager.h | 1 - be/src/olap/tablet_meta.cpp | 1 + 12 files changed, 151 insertions(+), 267 deletions(-) diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 20fe1a5f16be0d..7c972479580765 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -56,8 +56,6 @@ class BaseTablet : public std::enable_shared_from_this { int64_t tablet_id() const; int32_t schema_hash() const; int16_t shard_id(); - const int64_t creation_time() const; - void set_creation_time(int64_t creation_time); bool equal(int64_t tablet_id, int32_t schema_hash); // properties encapsulated in TabletSchema @@ -133,14 +131,6 @@ inline int16_t BaseTablet::shard_id() { return _tablet_meta->shard_id(); } -inline const int64_t BaseTablet::creation_time() const { - return _tablet_meta->creation_time(); -} - -inline void BaseTablet::set_creation_time(int64_t creation_time) { - _tablet_meta->set_creation_time(creation_time); -} - inline bool BaseTablet::equal(int64_t id, int32_t hash) { return (tablet_id() == id) && (schema_hash() == hash); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b885162b764a8b..1aaedec5ddb6d8 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -18,7 +18,6 @@ #include "olap/compaction.h" #include "gutil/strings/substitute.h" -#include "olap/rowset/rowset_factory.h" #include "util/time.h" #include "util/trace.h" @@ -157,25 +156,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { } Status Compaction::construct_output_rowset_writer() { - RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = _tablet->tablet_uid(); - context.tablet_id = _tablet->tablet_id(); - context.partition_id = _tablet->partition_id(); - context.tablet_schema_hash = _tablet->schema_hash(); - context.data_dir = _tablet->data_dir(); - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - context.rowset_type = BETA_ROWSET; - } - context.path_desc = _tablet->tablet_path_desc(); - context.tablet_schema = &(_tablet->tablet_schema()); - context.rowset_state = VISIBLE; - context.version = _output_version; - context.segments_overlap = NONOVERLAPPING; - // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool - RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer)); - return Status::OK(); + return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, + &_output_rs_writer); } Status Compaction::construct_input_rowset_readers() { diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 862d5e02eb2579..13f4648690d16c 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -37,7 +37,6 @@ #include "olap/file_helper.h" #include "olap/olap_define.h" #include "olap/rowset/alpha_rowset_meta.h" -#include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/storage_engine.h" #include "olap/tablet_meta_manager.h" @@ -469,8 +468,7 @@ Status DataDir::load() { continue; } RowsetSharedPtr rowset; - Status create_status = RowsetFactory::create_rowset( - &tablet->tablet_schema(), tablet->tablet_path_desc(), rowset_meta, &rowset); + Status create_status = tablet->create_rowset(rowset_meta, &rowset); if (!create_status) { LOG(WARNING) << "could not create rowset from rowsetmeta: " << " rowset_id: " << rowset_meta->rowset_id() @@ -498,7 +496,7 @@ Status DataDir::load() { } } else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE && rowset_meta->tablet_uid() == tablet->tablet_uid()) { - Status publish_status = tablet->add_rowset(rowset, false); + Status publish_status = tablet->add_rowset(rowset); if (!publish_status && publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { LOG(WARNING) << "add visible rowset to tablet failed rowset_id:" diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b40677e2b044ae..613a74219526d8 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -20,7 +20,6 @@ #include "olap/data_dir.h" #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" -#include "olap/rowset/rowset_factory.h" #include "olap/schema.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" @@ -115,33 +114,15 @@ Status DeltaWriter::init() { _req.txn_id, _req.load_id)); } - RowsetWriterContext writer_context; - writer_context.rowset_id = _storage_engine->next_rowset_id(); - writer_context.tablet_uid = _tablet->tablet_uid(); - writer_context.tablet_id = _req.tablet_id; - writer_context.partition_id = _req.partition_id; - writer_context.tablet_schema_hash = _req.schema_hash; - if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - writer_context.rowset_type = BETA_ROWSET; - } else { - writer_context.rowset_type = ALPHA_ROWSET; - } - writer_context.path_desc = _tablet->tablet_path_desc(); - writer_context.tablet_schema = &(_tablet->tablet_schema()); - writer_context.rowset_state = PREPARED; - writer_context.txn_id = _req.txn_id; - writer_context.load_id = _req.load_id; - writer_context.segments_overlap = OVERLAPPING; - writer_context.data_dir = _tablet->data_dir(); - RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer)); - + RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING, + &_rowset_writer)); _tablet_schema = &(_tablet->tablet_schema()); _schema.reset(new Schema(*_tablet_schema)); _reset_mem_table(); // create flush handler RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token( - &_flush_token, writer_context.rowset_type, _req.is_high_priority)); + &_flush_token, _rowset_writer->type(), _req.is_high_priority)); _is_init = true; return Status::OK(); diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 9e1bcaccee6c9b..02e0d46dc92c70 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -25,7 +25,6 @@ #include "common/status.h" #include "exec/parquet_scanner.h" #include "olap/row.h" -#include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/schema_change.h" @@ -221,29 +220,12 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_ // 1. init RowsetBuilder of cur_tablet for current push VLOG_NOTICE << "init rowset builder. tablet=" << cur_tablet->full_name() << ", block_row_size=" << cur_tablet->num_rows_per_row_block(); - RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = cur_tablet->tablet_uid(); - context.tablet_id = cur_tablet->tablet_id(); - context.partition_id = _request.partition_id; - context.tablet_schema_hash = cur_tablet->schema_hash(); - context.data_dir = cur_tablet->data_dir(); - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - context.rowset_type = BETA_ROWSET; - } - context.path_desc = cur_tablet->tablet_path_desc(); - context.tablet_schema = &(cur_tablet->tablet_schema()); - context.rowset_state = PREPARED; - context.txn_id = _request.transaction_id; - context.load_id = load_id; // although the spark load output files are fully sorted, // but it depends on thirparty implementation, so we conservatively // set this value to OVERLAP_UNKNOWN - context.segments_overlap = OVERLAP_UNKNOWN; - std::unique_ptr rowset_writer; - res = RowsetFactory::create_rowset_writer(context, &rowset_writer); + res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED, + OVERLAP_UNKNOWN, &rowset_writer); if (!res.ok()) { LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name() << ", txn_id=" << _request.transaction_id << ", res=" << res; @@ -407,30 +389,9 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab } // 2. init RowsetBuilder of cur_tablet for current push - VLOG_NOTICE << "init RowsetBuilder."; - RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = cur_tablet->tablet_uid(); - context.tablet_id = cur_tablet->tablet_id(); - context.partition_id = _request.partition_id; - context.tablet_schema_hash = cur_tablet->schema_hash(); - context.data_dir = cur_tablet->data_dir(); - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - context.rowset_type = BETA_ROWSET; - } - context.path_desc = cur_tablet->tablet_path_desc(); - context.tablet_schema = &(cur_tablet->tablet_schema()); - context.rowset_state = PREPARED; - context.txn_id = _request.transaction_id; - context.load_id = load_id; - // although the hadoop load output files are fully sorted, - // but it depends on thirparty implementation, so we conservatively - // set this value to OVERLAP_UNKNOWN - context.segments_overlap = OVERLAP_UNKNOWN; - std::unique_ptr rowset_writer; - res = RowsetFactory::create_rowset_writer(context, &rowset_writer); + res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED, + OVERLAP_UNKNOWN, &rowset_writer); if (!res.ok()) { LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name() << ", txn_id=" << _request.transaction_id << ", res=" << res; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 871f6bb64a7a38..cc79f55096392a 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -29,7 +29,6 @@ #include "olap/row.h" #include "olap/row_block.h" #include "olap/row_cursor.h" -#include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/storage_engine.h" #include "olap/tablet.h" @@ -1311,25 +1310,11 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector& ro uint64_t merged_rows = 0; RowBlockMerger merger(new_tablet); - RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = new_tablet->tablet_uid(); - context.tablet_id = new_tablet->tablet_id(); - context.partition_id = new_tablet->partition_id(); - context.tablet_schema_hash = new_tablet->schema_hash(); - context.rowset_type = new_rowset_type; - context.path_desc = new_tablet->tablet_path_desc(); - context.tablet_schema = &(new_tablet->tablet_schema()); - context.data_dir = new_tablet->data_dir(); - context.rowset_state = VISIBLE; - context.version = version; - context.segments_overlap = segments_overlap; - VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name() << ", block_row_size=" << new_tablet->num_rows_per_row_block(); std::unique_ptr rowset_writer; - if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != Status::OK()) { + if (!new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer)) { return false; } @@ -1699,28 +1684,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, RowsetReaderSharedPtr rowset_reader; RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader)); RETURN_NOT_OK(rowset_reader->init(&reader_context)); - - RowsetWriterContext writer_context; - writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); - writer_context.tablet_uid = new_tablet->tablet_uid(); - writer_context.tablet_id = new_tablet->tablet_id(); - writer_context.partition_id = (*base_rowset)->partition_id(); - writer_context.tablet_schema_hash = new_tablet->schema_hash(); - writer_context.data_dir = new_tablet->data_dir(); - writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type(); - if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - writer_context.rowset_type = BETA_ROWSET; - } - writer_context.path_desc = new_tablet->tablet_path_desc(); - writer_context.tablet_schema = &(new_tablet->tablet_schema()); - writer_context.rowset_state = PREPARED; - writer_context.txn_id = (*base_rowset)->txn_id(); - writer_context.load_id.set_hi((*base_rowset)->load_id().hi()); - writer_context.load_id.set_lo((*base_rowset)->load_id().lo()); - writer_context.segments_overlap = (*base_rowset)->rowset_meta()->segments_overlap(); - + PUniqueId load_id; + load_id.set_hi((*base_rowset)->load_id().hi()); + load_id.set_lo((*base_rowset)->load_id().lo()); std::unique_ptr rowset_writer; - RowsetFactory::create_rowset_writer(writer_context, &rowset_writer); + RETURN_NOT_OK(new_tablet->create_rowset_writer( + (*base_rowset)->txn_id(), load_id, PREPARED, + (*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer)); if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet)) != Status::OK()) { @@ -1842,29 +1812,12 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // As long as there is a new_table as running, ref table is set as running // NOTE If the first sub_table fails first, it will continue to go as normal here TabletSharedPtr new_tablet = sc_params.new_tablet; - - RowsetWriterContext writer_context; - writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); - writer_context.tablet_uid = new_tablet->tablet_uid(); - writer_context.tablet_id = new_tablet->tablet_id(); - writer_context.partition_id = new_tablet->partition_id(); - writer_context.tablet_schema_hash = new_tablet->schema_hash(); - writer_context.data_dir = new_tablet->data_dir(); - // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now - writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type(); - if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - // Use beta rowset to do schema change - // And in this case, linked schema change will not be used. - writer_context.rowset_type = BETA_ROWSET; - } - writer_context.path_desc = new_tablet->tablet_path_desc(); - writer_context.tablet_schema = &(new_tablet->tablet_schema()); - writer_context.rowset_state = VISIBLE; - writer_context.version = rs_reader->version(); - writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); - + // When tablet create new rowset writer, it may change rowset type, in this case + // linked schema change will not be used. std::unique_ptr rowset_writer; - Status status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer); + Status status = new_tablet->create_rowset_writer( + rs_reader->version(), VISIBLE, + rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer); if (!status.ok()) { res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT); goto PROCESS_ALTER_EXIT; @@ -1889,7 +1842,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams LOG(WARNING) << "failed to build rowset, exit alter process"; goto PROCESS_ALTER_EXIT; } - res = sc_params.new_tablet->add_rowset(new_rowset, false); + res = sc_params.new_tablet->add_rowset(new_rowset); if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { LOG(WARNING) << "version already exist, version revert occurred. " << "tablet=" << sc_params.new_tablet->full_name() << ", version='" diff --git a/be/src/olap/storage_migration_v2.cpp b/be/src/olap/storage_migration_v2.cpp index 4bd59e16b2f8fb..aed8fc96217d1b 100644 --- a/be/src/olap/storage_migration_v2.cpp +++ b/be/src/olap/storage_migration_v2.cpp @@ -372,7 +372,7 @@ Status StorageMigrationV2Handler::_convert_historical_rowsets( LOG(WARNING) << "failed to build rowset, exit alter process"; goto PROCESS_ALTER_EXIT; } - res = sm_params.new_tablet->add_rowset(new_rowset, false); + res = sm_params.new_tablet->add_rowset(new_rowset); if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { LOG(WARNING) << "version already exist, version revert occurred. " << "tablet=" << sm_params.new_tablet->full_name() << ", version='" diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index fb473cf3d975e3..640c93f1f301aa 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -206,7 +206,7 @@ Status Tablet::revise_tablet_meta(const std::vector& rowset return res; } -Status Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) { +Status Tablet::add_rowset(RowsetSharedPtr rowset) { DCHECK(rowset != nullptr); std::lock_guard wrlock(_meta_lock); // If the rowset already exist, just return directly. The rowset_id is an unique-id, @@ -235,15 +235,6 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) { } std::vector empty_vec; modify_rowsets(empty_vec, rowsets_to_delete); - - if (need_persist) { - Status res = - RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(), rowset->rowset_id(), - rowset->rowset_meta()->get_rowset_pb()); - if (!res.ok()) { - LOG(FATAL) << "failed to save rowset to local meta store" << rowset->rowset_id(); - } - } ++_newly_created_rowset_num; return Status::OK(); } @@ -1446,4 +1437,99 @@ void Tablet::reset_compaction(CompactionType compaction_type) { } } +Status Tablet::create_initial_rowset(const int64_t req_version) { + Status res = Status::OK(); + if (req_version < 1) { + LOG(WARNING) << "init version of tablet should at least 1. req.ver=" << req_version; + return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR); + } + Version version(0, req_version); + RowsetSharedPtr new_rowset; + do { + // there is no data in init rowset, so overlapping info is unknown. + std::unique_ptr rs_writer; + res = create_rowset_writer(version, VISIBLE, OVERLAP_UNKNOWN, &rs_writer); + if (!res.ok()) { + LOG(WARNING) << "failed to init rowset writer for tablet " << full_name(); + break; + } + res = rs_writer->flush(); + if (!res.ok()) { + LOG(WARNING) << "failed to flush rowset writer for tablet " << full_name(); + break; + } + + new_rowset = rs_writer->build(); + res = add_rowset(new_rowset); + if (!res.ok()) { + LOG(WARNING) << "failed to add rowset for tablet " << full_name(); + break; + } + } while (0); + + // Unregister index and delete files(index and data) if failed + if (!res.ok()) { + LOG(WARNING) << "fail to create initial rowset. res=" << res << " version=" << req_version; + StorageEngine::instance()->add_unused_rowset(new_rowset); + return res; + } + set_cumulative_layer_point(req_version + 1); + return res; +} + +Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, + const SegmentsOverlapPB& overlap, + std::unique_ptr* rowset_writer) { + RowsetWriterContext context; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.tablet_uid = tablet_uid(); + context.tablet_id = tablet_id(); + context.partition_id = partition_id(); + context.tablet_schema_hash = schema_hash(); + context.data_dir = data_dir(); + context.rowset_type = tablet_meta()->preferred_rowset_type(); + // Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is + // alpah rowset, then set the newly created rowset to storage engine's default rowset. + if (context.rowset_type == ALPHA_ROWSET) { + context.rowset_type = StorageEngine::instance()->default_rowset_type(); + } + context.path_desc = tablet_path_desc(); + context.tablet_schema = &(tablet_schema()); + context.rowset_state = rowset_state; + context.version = version; + context.segments_overlap = overlap; + // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool + return RowsetFactory::create_rowset_writer(context, rowset_writer); +} + +Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id, + const RowsetStatePB& rowset_state, + const SegmentsOverlapPB& overlap, + std::unique_ptr* rowset_writer) { + RowsetWriterContext context; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.tablet_uid = tablet_uid(); + context.tablet_id = tablet_id(); + context.partition_id = partition_id(); + context.tablet_schema_hash = schema_hash(); + context.rowset_type = tablet_meta()->preferred_rowset_type(); + // Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is + // alpah rowset, then set the newly created rowset to storage engine's default rowset. + if (context.rowset_type == ALPHA_ROWSET) { + context.rowset_type = StorageEngine::instance()->default_rowset_type(); + } + context.path_desc = tablet_path_desc(); + context.tablet_schema = &(tablet_schema()); + context.rowset_state = rowset_state; + context.txn_id = txn_id; + context.load_id = load_id; + context.segments_overlap = overlap; + context.data_dir = data_dir(); + return RowsetFactory::create_rowset_writer(context, rowset_writer); +} + +Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset) { + return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset); +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index cf413a5064e852..18f9ff9bdb087b 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -33,6 +33,7 @@ #include "olap/olap_define.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" +#include "olap/rowset/rowset_writer.h" #include "olap/tablet_meta.h" #include "olap/tuple.h" #include "olap/utils.h" @@ -97,7 +98,8 @@ class Tablet : public BaseTablet { int32_t field_index(const std::string& field_name) const; // operation in rowsets - Status add_rowset(RowsetSharedPtr rowset, bool need_persist = true); + Status add_rowset(RowsetSharedPtr rowset); + Status create_initial_rowset(const int64_t version); void modify_rowsets(std::vector& to_add, std::vector& to_delete); @@ -256,6 +258,16 @@ class Tablet : public BaseTablet { return _tablet_meta->all_beta(); } + Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, + const SegmentsOverlapPB& overlap, + std::unique_ptr* rowset_writer); + + Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id, + const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap, + std::unique_ptr* rowset_writer); + + Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset); + private: Status _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 2936c9862ecf7d..c3a682f283726c 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -39,7 +39,6 @@ #include "olap/push_handler.h" #include "olap/reader.h" #include "olap/rowset/column_data_writer.h" -#include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/schema_change.h" #include "olap/tablet.h" @@ -332,7 +331,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( // bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode; // if (!in_restore_mode && request.__isset.version) { // create initial rowset before add it to storage engine could omit many locks - res = _create_initial_rowset_unlocked(request, tablet.get()); + res = tablet->create_initial_rowset(request.version); if (!res.ok()) { LOG(WARNING) << "fail to create initial version for tablet. res=" << res; break; @@ -343,26 +342,10 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( // because schema change handler depends on it to check whether history data // convert finished tablet->set_tablet_state(TabletState::TABLET_NOTREADY); - // The following two special situations may occur: - // 1. Because the operating system time jumps, the creation_time of the newly generated table - // is less than the creation_time of the old table - // 2. Because the unit of second is unified in the olap engine code, - // if two operations (such as creating a table, and then immediately altering the table) - // is less than 1s, then the creation_time of the new table and the old table obtained by alter will be the same - // - // When the above two situations occur, in order to be able to distinguish between the new tablet - // obtained by alter and the old tablet, the creation_time of the new tablet is set to - // the creation_time of the old tablet increased by 1 - if (tablet->creation_time() <= base_tablet->creation_time()) { - LOG(WARNING) << "new tablet's create time is less than or equal to old tablet" - << "new_tablet_create_time=" << tablet->creation_time() - << ", base_tablet_create_time=" << base_tablet->creation_time(); - int64_t new_creation_time = base_tablet->creation_time() + 1; - tablet->set_creation_time(new_creation_time); - } } // Add tablet to StorageEngine will make it visible to user - res = _add_tablet_unlocked(new_tablet_id, tablet, true, false); + // Will persist tablet meta + res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false); if (!res.ok()) { LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res; break; @@ -1167,76 +1150,6 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { << ", number: " << counter << ", cost(ms): " << cost; } -Status TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq& request, - Tablet* tablet) { - Status res = Status::OK(); - if (request.version < 1) { - LOG(WARNING) << "init version of tablet should at least 1. req.ver=" << request.version; - return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR); - } else { - Version version(0, request.version); - VLOG_NOTICE << "begin to create init version. version=" << version; - RowsetSharedPtr new_rowset; - do { - RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = tablet->tablet_uid(); - context.tablet_id = tablet->tablet_id(); - context.partition_id = tablet->partition_id(); - context.tablet_schema_hash = tablet->schema_hash(); - context.data_dir = tablet->data_dir(); - if (!request.__isset.storage_format || - request.storage_format == TStorageFormat::DEFAULT) { - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - } else if (request.storage_format == TStorageFormat::V1) { - context.rowset_type = RowsetTypePB::ALPHA_ROWSET; - } else if (request.storage_format == TStorageFormat::V2) { - context.rowset_type = RowsetTypePB::BETA_ROWSET; - } else { - LOG(ERROR) << "invalid TStorageFormat: " << request.storage_format; - DCHECK(false); - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - } - context.path_desc = tablet->tablet_path_desc(); - context.tablet_schema = &(tablet->tablet_schema()); - context.rowset_state = VISIBLE; - context.version = version; - // there is no data in init rowset, so overlapping info is unknown. - context.segments_overlap = OVERLAP_UNKNOWN; - - std::unique_ptr builder; - res = RowsetFactory::create_rowset_writer(context, &builder); - if (!res.ok()) { - LOG(WARNING) << "failed to init rowset writer for tablet " << tablet->full_name(); - break; - } - res = builder->flush(); - if (!res.ok()) { - LOG(WARNING) << "failed to flush rowset writer for tablet " << tablet->full_name(); - break; - } - - new_rowset = builder->build(); - res = tablet->add_rowset(new_rowset, false); - if (!res.ok()) { - LOG(WARNING) << "failed to add rowset for tablet " << tablet->full_name(); - break; - } - } while (0); - - // Unregister index and delete files(index and data) if failed - if (!res.ok()) { - LOG(WARNING) << "fail to create initial rowset. res=" << res << " version=" << version; - StorageEngine::instance()->add_unused_rowset(new_rowset); - return res; - } - } - tablet->set_cumulative_layer_point(request.version + 1); - // NOTE: should not save tablet meta here, because it will be saved if add to map successfully - - return res; -} - Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store, const bool is_schema_change, const Tablet* base_tablet, @@ -1276,11 +1189,19 @@ Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& reque RETURN_NOT_OK_LOG(store->get_shard(&shard_id), "fail to get root path shard"); Status res = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id, col_idx_to_unique_id, tablet_meta); - - if (request.__isset.storage_format && request.storage_format != TStorageFormat::V1) { - (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); - } else { - (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); + RETURN_NOT_OK(res); + if (request.__isset.storage_format) { + if (request.storage_format == TStorageFormat::DEFAULT) { + (*tablet_meta) + ->set_preferred_rowset_type(StorageEngine::instance()->default_rowset_type()); + } else if (request.storage_format == TStorageFormat::V1) { + (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET); + } else if (request.storage_format == TStorageFormat::V2) { + (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); + } else { + LOG(FATAL) << "invalid TStorageFormat: " << request.storage_format; + return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR); + } } return res; } diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index a2420a5d814493..85b4c644ae7451 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -154,7 +154,6 @@ class TabletManager { bool update_meta, bool keep_files, bool drop_old); bool _check_tablet_id_exist_unlocked(TTabletId tablet_id); - Status _create_initial_rowset_unlocked(const TCreateTabletReq& request, Tablet* tablet); Status _drop_tablet_directly_unlocked(TTabletId tablet_id, bool keep_files = false); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 7e1a9b40ccf729..0cd9202a7238b5 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -62,6 +62,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id tablet_meta_pb.set_tablet_id(tablet_id); tablet_meta_pb.set_schema_hash(schema_hash); tablet_meta_pb.set_shard_id(shard_id); + // Persist the creation time, but it is not used tablet_meta_pb.set_creation_time(time(nullptr)); tablet_meta_pb.set_cumulative_layer_point(-1); tablet_meta_pb.set_tablet_state(PB_RUNNING); From edcae1934f0152d7d425e4185e249e63c63c248a Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 5 May 2022 10:42:39 +0800 Subject: [PATCH 2/3] remove new rowset type --- be/src/olap/schema_change.cpp | 15 ++------------- be/src/olap/schema_change.h | 3 +-- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index cc79f55096392a..b382c862ea0995 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1159,8 +1159,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, reset_merged_rows(); reset_filtered_rows(); - bool use_beta_rowset = new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET; - SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); @@ -1190,14 +1188,10 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, // enter here while memory limitation is reached. RowsetSharedPtr rowset; - RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type(); - if (use_beta_rowset) { - new_rowset_type = BETA_ROWSET; - } if (!_internal_sorting( row_block_arr, Version(_temp_delta_versions.second, _temp_delta_versions.second), - new_tablet, new_rowset_type, segments_overlap, &rowset)) { + new_tablet, segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); } @@ -1246,13 +1240,9 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, // enter here while memory limitation is reached. RowsetSharedPtr rowset = nullptr; - RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type(); - if (use_beta_rowset) { - new_rowset_type = BETA_ROWSET; - } if (!_internal_sorting(row_block_arr, Version(_temp_delta_versions.second, _temp_delta_versions.second), - new_tablet, new_rowset_type, segments_overlap, &rowset)) { + new_tablet, segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); } @@ -1304,7 +1294,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, bool SchemaChangeWithSorting::_internal_sorting(const std::vector& row_block_arr, const Version& version, TabletSharedPtr new_tablet, - RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) { uint64_t merged_rows = 0; diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index ec1f2dc0f227bf..dc382ececd688d 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -162,8 +162,7 @@ class SchemaChangeWithSorting : public SchemaChange { private: bool _internal_sorting(const std::vector& row_block_arr, const Version& temp_delta_versions, TabletSharedPtr new_tablet, - RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, - RowsetSharedPtr* rowset); + SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset); bool _external_sorting(std::vector& src_rowsets, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet); From ab89bcfc2e57f21b7be0b6856c539c86220024e9 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Tue, 10 May 2022 11:39:10 +0800 Subject: [PATCH 3/3] remove duplicate code --- be/src/olap/tablet.cpp | 32 +++++++++++--------------------- be/src/olap/tablet.h | 1 + 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 640c93f1f301aa..cb073a85583b50 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1481,24 +1481,10 @@ Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& const SegmentsOverlapPB& overlap, std::unique_ptr* rowset_writer) { RowsetWriterContext context; - context.rowset_id = StorageEngine::instance()->next_rowset_id(); - context.tablet_uid = tablet_uid(); - context.tablet_id = tablet_id(); - context.partition_id = partition_id(); - context.tablet_schema_hash = schema_hash(); - context.data_dir = data_dir(); - context.rowset_type = tablet_meta()->preferred_rowset_type(); - // Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is - // alpah rowset, then set the newly created rowset to storage engine's default rowset. - if (context.rowset_type == ALPHA_ROWSET) { - context.rowset_type = StorageEngine::instance()->default_rowset_type(); - } - context.path_desc = tablet_path_desc(); - context.tablet_schema = &(tablet_schema()); - context.rowset_state = rowset_state; context.version = version; + context.rowset_state = rowset_state; context.segments_overlap = overlap; - // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool + _init_context_common_fields(context); return RowsetFactory::create_rowset_writer(context, rowset_writer); } @@ -1507,6 +1493,15 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load const SegmentsOverlapPB& overlap, std::unique_ptr* rowset_writer) { RowsetWriterContext context; + context.txn_id = txn_id; + context.load_id = load_id; + context.rowset_state = rowset_state; + context.segments_overlap = overlap; + _init_context_common_fields(context); + return RowsetFactory::create_rowset_writer(context, rowset_writer); +} + +void Tablet::_init_context_common_fields(RowsetWriterContext& context) { context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.tablet_uid = tablet_uid(); context.tablet_id = tablet_id(); @@ -1520,12 +1515,7 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load } context.path_desc = tablet_path_desc(); context.tablet_schema = &(tablet_schema()); - context.rowset_state = rowset_state; - context.txn_id = txn_id; - context.load_id = load_id; - context.segments_overlap = overlap; context.data_dir = data_dir(); - return RowsetFactory::create_rowset_writer(context, rowset_writer); } Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 18f9ff9bdb087b..1874afa4e84b46 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -293,6 +293,7 @@ class Tablet : public BaseTablet { // When the proportion of empty edges in the adjacency matrix used to represent the version graph // in the version tracker is greater than the threshold, rebuild the version tracker bool _reconstruct_version_tracker_if_necessary(); + void _init_context_common_fields(RowsetWriterContext& context); public: static const int64_t K_INVALID_CUMULATIVE_POINT = -1;