diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 3a934776f3f4b6..8c5637abb9f5d4 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -60,13 +60,15 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl); - std::vector tablet_vars(1); - tablet_vars[0].tablet = tablet; - res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec); + res = _do_streaming_ingestion(tablet, request, push_type, tablet_info_vec); if (res.ok()) { if (tablet_info_vec != nullptr) { - _get_tablet_infos(tablet_vars, tablet_info_vec); + TTabletInfo tablet_info; + tablet_info.tablet_id = tablet->tablet_id(); + tablet_info.schema_hash = tablet->schema_hash(); + StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info); + tablet_info_vec->push_back(tablet_info); } LOG(INFO) << "process realtime push successfully. " << "tablet=" << tablet->full_name() << ", partition_id=" << request.partition_id @@ -78,7 +80,6 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request, PushType push_type, - std::vector* tablet_vars, std::vector* tablet_info_vec) { // add transaction in engine, then check sc status // lock, prevent sc handler checking transaction concurrently @@ -99,10 +100,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR request.partition_id, tablet, request.transaction_id, load_id)); } - if (tablet_vars->size() == 1) { - tablet_vars->resize(2); - } - // not call validate request here, because realtime load does not // contain version info @@ -110,117 +107,79 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR // check delete condition if push for delete std::queue del_preds; if (push_type == PUSH_FOR_DELETE) { - for (TabletVars& tablet_var : *tablet_vars) { - if (tablet_var.tablet == nullptr) { - continue; - } - - DeletePredicatePB del_pred; - TabletSchema tablet_schema; - tablet_schema.copy_from(*tablet_var.tablet->tablet_schema()); - if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) { - tablet_schema.clear_columns(); - for (const auto& column_desc : request.columns_desc) { - tablet_schema.append_column(TabletColumn(column_desc)); - } - } - res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions, - &del_pred); - del_preds.push(del_pred); - if (!res.ok()) { - LOG(WARNING) << "fail to generate delete condition. res=" << res - << ", tablet=" << tablet_var.tablet->full_name(); - return res; + DeletePredicatePB del_pred; + TabletSchema tablet_schema; + tablet_schema.copy_from(*tablet->tablet_schema()); + if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) { + tablet_schema.clear_columns(); + for (const auto& column_desc : request.columns_desc) { + tablet_schema.append_column(TabletColumn(column_desc)); } } + res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions, + &del_pred); + del_preds.push(del_pred); + if (!res.ok()) { + LOG(WARNING) << "fail to generate delete condition. res=" << res + << ", tablet=" << tablet->full_name(); + return res; + } } // check if version number exceed limit - if (tablet_vars->at(0).tablet->version_count() > config::max_tablet_version_num) { - LOG(WARNING) << "failed to push data. version count: " - << tablet_vars->at(0).tablet->version_count() + if (tablet->version_count() > config::max_tablet_version_num) { + LOG(WARNING) << "failed to push data. version count: " << tablet->version_count() << ", exceed limit: " << config::max_tablet_version_num - << ". tablet: " << tablet_vars->at(0).tablet->full_name(); + << ". tablet: " << tablet->full_name(); return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION); } auto tablet_schema = std::make_shared(); - tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema()); + tablet_schema->copy_from(*tablet->tablet_schema()); if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) { tablet_schema->clear_columns(); for (const auto& column_desc : request.columns_desc) { tablet_schema->append_column(TabletColumn(column_desc)); } } - + RowsetSharedPtr rowset_to_add; // writes if (push_type == PUSH_NORMAL_V2) { - res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, - &(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add), - tablet_schema); + res = _convert_v2(tablet, &rowset_to_add, tablet_schema); } else { - res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, - &(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add), - tablet_schema); + res = _convert(tablet, &rowset_to_add, tablet_schema); } if (!res.ok()) { LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res << ", failed to process realtime push." << ", tablet=" << tablet->full_name() << ", transaction_id=" << request.transaction_id; - for (TabletVars& tablet_var : *tablet_vars) { - if (tablet_var.tablet == nullptr) { - continue; - } - Status rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn( - request.partition_id, tablet_var.tablet, request.transaction_id); - // has to check rollback status to ensure not delete a committed rowset - if (rollback_status.ok()) { - StorageEngine::instance()->add_unused_rowset(tablet_var.rowset_to_add); - } + Status rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn( + request.partition_id, tablet, request.transaction_id); + // has to check rollback status to ensure not delete a committed rowset + if (rollback_status.ok()) { + StorageEngine::instance()->add_unused_rowset(rowset_to_add); } return res; } // add pending data to tablet - for (TabletVars& tablet_var : *tablet_vars) { - if (tablet_var.tablet == nullptr) { - continue; - } - if (push_type == PUSH_FOR_DELETE) { - tablet_var.rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front()); - del_preds.pop(); - } - Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn( - request.partition_id, tablet_var.tablet, request.transaction_id, load_id, - tablet_var.rowset_to_add, false); - if (commit_status != Status::OK() && - commit_status.precise_code() != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { - res = commit_status; - } + if (push_type == PUSH_FOR_DELETE) { + rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front()); + del_preds.pop(); } - return res; -} - -void PushHandler::_get_tablet_infos(const std::vector& tablet_vars, - std::vector* tablet_info_vec) { - for (const TabletVars& tablet_var : tablet_vars) { - if (tablet_var.tablet.get() == nullptr) { - continue; - } - - TTabletInfo tablet_info; - tablet_info.tablet_id = tablet_var.tablet->tablet_id(); - tablet_info.schema_hash = tablet_var.tablet->schema_hash(); - StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info); - tablet_info_vec->push_back(tablet_info); + Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn( + request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false); + if (commit_status != Status::OK() && + commit_status.precise_code() != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { + res = commit_status; } + return res; } -Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet, - RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset, +Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, TabletSchemaSPtr tablet_schema) { Status res = Status::OK(); uint32_t num_rows = 0; @@ -325,18 +284,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_ _write_bytes += (*cur_rowset)->data_disk_size(); _write_rows += (*cur_rowset)->num_rows(); - - // 5. Convert data for schema change tables - VLOG_TRACE << "load to related tables of schema_change if possible."; - if (new_tablet != nullptr) { - res = SchemaChangeHandler::schema_version_convert( - cur_tablet, new_tablet, cur_rowset, new_rowset, *_desc_tbl, tablet_schema); - if (!res.ok()) { - LOG(WARNING) << "failed to change schema version for delta." - << "[res=" << res << " new_tablet='" << new_tablet->full_name() - << "']"; - } - } } while (false); VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name() @@ -344,8 +291,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_ return res; } -Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet, - RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset, +Status PushHandler::_convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, TabletSchemaSPtr tablet_schema) { Status res = Status::OK(); RowCursor row; @@ -466,18 +412,6 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab _write_bytes += (*cur_rowset)->data_disk_size(); _write_rows += (*cur_rowset)->num_rows(); - - // 7. Convert data for schema change tables - VLOG_TRACE << "load to related tables of schema_change if possible."; - if (new_tablet != nullptr) { - res = SchemaChangeHandler::schema_version_convert( - cur_tablet, new_tablet, cur_rowset, new_rowset, *_desc_tbl, tablet_schema); - if (!res.ok()) { - LOG(WARNING) << "failed to change schema version for delta." - << "[res=" << res << " new_tablet='" << new_tablet->full_name() - << "']"; - } - } } while (false); SAFE_DELETE(reader); diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index b22d319845b663..02384e9f2d0e6a 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -39,11 +39,6 @@ class BinaryReader; struct ColumnMapping; class RowCursor; -struct TabletVars { - TabletSharedPtr tablet; - RowsetSharedPtr rowset_to_add; -}; - class PushHandler { public: using SchemaMapping = std::vector; @@ -60,24 +55,18 @@ class PushHandler { int64_t write_rows() const { return _write_rows; } private: - Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec, - RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset, + Status _convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, TabletSchemaSPtr tablet_schema); // Convert local data file to internal formatted delta, // return new delta's SegmentGroup - Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec, - RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset, + Status _convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, TabletSchemaSPtr tablet_schema); // Only for debug std::string _debug_version_list(const Versions& versions) const; - void _get_tablet_infos(const std::vector& tablet_infos, - std::vector* tablet_info_vec); - Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request, - PushType push_type, vector* tablet_vars, - std::vector* tablet_info_vec); + PushType push_type, std::vector* tablet_info_vec); private: // mainly tablet_id, version and delta file path diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 7672e10eecb0cc..abc83ff768aad3 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -2039,105 +2039,6 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) { return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end(); } -Status SchemaChangeHandler::schema_version_convert( - TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowsetSharedPtr* base_rowset, - RowsetSharedPtr* new_rowset, DescriptorTbl desc_tbl, TabletSchemaSPtr base_schema_change) { - Status res = Status::OK(); - LOG(INFO) << "begin to convert delta version for schema changing. " - << "base_tablet=" << base_tablet->full_name() - << ", new_tablet=" << new_tablet->full_name(); - - // a. Parse the Alter request and convert it into an internal representation - // Do not use the delete condition specified by the DELETE_DATA command - RowBlockChanger rb_changer(*new_tablet->tablet_schema(), desc_tbl); - bool sc_sorting = false; - bool sc_directly = false; - - const std::unordered_map materialized_function_map; - if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly, - materialized_function_map, desc_tbl, base_schema_change.get()); - !res) { - LOG(WARNING) << "failed to parse the request. res=" << res; - return res; - } - - // NOTE split_table if row_block is used, the original block will become smaller - // But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly - // b. Generate historical data converter - auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly); - - // c. Convert data - DeleteHandler delete_handler; - std::vector return_columns; - size_t num_cols = base_schema_change->num_columns(); - return_columns.resize(num_cols); - for (int i = 0; i < num_cols; ++i) { - return_columns[i] = i; - } - - RowsetReaderContext reader_context; - reader_context.reader_type = READER_ALTER_TABLE; - reader_context.tablet_schema = base_schema_change.get(); - reader_context.need_ordered_result = true; - reader_context.delete_handler = &delete_handler; - reader_context.return_columns = &return_columns; - reader_context.seek_columns = &return_columns; - reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); - reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; - reader_context.is_vec = config::enable_vectorized_alter_table; - - RowsetReaderSharedPtr rowset_reader; - RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader)); - RETURN_NOT_OK(rowset_reader->init(&reader_context)); - PUniqueId load_id; - load_id.set_hi((*base_rowset)->load_id().hi()); - load_id.set_lo((*base_rowset)->load_id().lo()); - std::unique_ptr rowset_writer; - RETURN_NOT_OK(new_tablet->create_rowset_writer( - (*base_rowset)->txn_id(), load_id, PREPARED, - (*base_rowset)->rowset_meta()->segments_overlap(), base_schema_change, &rowset_writer)); - - auto schema_version_convert_error = [&]() -> Status { - if (*new_rowset != nullptr) { - StorageEngine::instance()->add_unused_rowset(*new_rowset); - } - - LOG(WARNING) << "failed to convert rowsets. " - << " base_tablet=" << base_tablet->full_name() - << ", new_tablet=" << new_tablet->full_name() << " res = " << res; - return res; - }; - - if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet); - !res) { - if ((*base_rowset)->is_pending()) { - LOG(WARNING) << "failed to process the transaction when schema change. " - << "tablet=" << new_tablet->full_name() << "'" - << ", transaction=" << (*base_rowset)->txn_id(); - } else { - LOG(WARNING) << "failed to process the version. " - << "version=" << (*base_rowset)->version().first << "-" - << (*base_rowset)->version().second; - } - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + - rowset_writer->rowset_id().to_string()); - return schema_version_convert_error(); - } - *new_rowset = rowset_writer->build(); - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + - rowset_writer->rowset_id().to_string()); - if (*new_rowset == nullptr) { - LOG(WARNING) << "build rowset failed."; - res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); - return schema_version_convert_error(); - } - - LOG(INFO) << "successfully convert rowsets. " - << " base_tablet=" << base_tablet->full_name() - << ", new_tablet=" << new_tablet->full_name(); - return res; -} - Status SchemaChangeHandler::_get_versions_to_be_changed( TabletSharedPtr base_tablet, std::vector* versions_to_be_changed, RowsetSharedPtr* max_rowset) { diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 4ef608fb4b6af1..0ce3761d4ff108 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -249,11 +249,6 @@ class VSchemaChangeWithSorting : public SchemaChange { class SchemaChangeHandler { public: - static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, - RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset, - DescriptorTbl desc_tbl, - TabletSchemaSPtr base_schema_change); - // schema change v2, it will not set alter task in base tablet static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);