From 3b03fd7247aff6d8099df04887e5c1dc03aa1704 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 17 Jan 2025 13:23:32 +0800 Subject: [PATCH 1/3] *: some minor refine Signed-off-by: Lloyd-Pottiger --- dbms/src/Debug/MockStorage.cpp | 5 +-- dbms/src/Debug/MockStorage.h | 2 +- .../Coprocessor/DAGStorageInterpreter.cpp | 33 ++++++++++--------- .../Flash/Coprocessor/GenSchemaAndColumn.cpp | 2 +- .../Planner/Plans/PhysicalMockTableScan.cpp | 2 +- dbms/src/Storages/IManageableStorage.h | 1 - dbms/src/Storages/StorageDeltaMerge.cpp | 29 +--------------- dbms/src/Storages/StorageDeltaMerge.h | 19 ++++++----- dbms/src/TiDB/Decode/TypeMapping.cpp | 10 ------ dbms/src/TiDB/Decode/TypeMapping.h | 2 -- 10 files changed, 35 insertions(+), 70 deletions(-) diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index 6e211406669..98d448b8822 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -28,6 +28,7 @@ #include #include #include +#include namespace DB { @@ -588,9 +589,9 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) return table_infos_for_delta_merge[name]; } -DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id) +DM::ColumnDefinesPtr MockStorage::getStoreColumnDefines(Int64 table_id) { - return storage_delta_merge_map[table_id]->getStoreColumnDefines(); + return storage_delta_merge_map[table_id]->getStore()->getStoreColumns(); } TiDB::ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos) diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index deed0156c72..df834eab235 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -149,7 +149,7 @@ class MockStorage TableInfo getTableInfo(const String & name); TableInfo getTableInfoForDeltaMerge(const String & name); - DM::ColumnDefines getStoreColumnDefines(Int64 table_id); + DM::ColumnDefinesPtr getStoreColumnDefines(Int64 table_id); size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index a7a38b0b165..7da5c1877c5 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -543,46 +543,43 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) // compare the columns in table_scan with the columns in storages, to check if the current schema is satisified this query. // column.name are always empty from table_scan, and column name is not necessary in read process, so we don't need compare the name here. std::tuple compareColumns( - const TiDBTableScan & table_scan, - const DM::ColumnDefines & cur_columns, + ColumnID logical_table_id, + const TiDB::ColumnInfos & table_scan_columns, + const TiDB::ColumnInfos & cur_columns, const DAGContext & dag_context, const LoggerPtr & log) { - const auto & columns = table_scan.getColumns(); - std::unordered_map column_id_map; + std::unordered_map column_id_map; for (const auto & column : cur_columns) - { - column_id_map[column.id] = column; - } + column_id_map[column.id] = &column; - for (const auto & column : columns) + for (const auto & column : table_scan_columns) { // Exclude virtual columns, including MutSup::extra_handle_id, MutSup::version_col_id,MutSup::delmark_col_id,MutSup::extra_table_id_col_id if (column.id < 0) - { continue; - } + auto iter = column_id_map.find(column.id); if (iter == column_id_map.end()) { String error_message = fmt::format( "the column in the query is not found in current columns, keyspace={} table_id={} column_id={}", dag_context.getKeyspaceID(), - table_scan.getLogicalTableID(), + logical_table_id, column.id); LOG_WARNING(log, error_message); return std::make_tuple(false, error_message); } - if (getDataTypeByColumnInfo(column)->getName() != iter->second.type->getName()) + if (getDataTypeByColumnInfo(column)->getName() != getDataTypeByColumnInfo(*iter->second)->getName()) { String error_message = fmt::format( "the column data type in the query is not the same as the current column, keyspace={} table_id={} " "column_id={} column_type={} query_column_type={}", dag_context.getKeyspaceID(), - table_scan.getLogicalTableID(), + logical_table_id, column.id, - iter->second.type->getName(), + getDataTypeByColumnInfo(*iter->second)->getName(), getDataTypeByColumnInfo(column)->getName()); LOG_WARNING(log, error_message); return std::make_tuple(false, error_message); @@ -1371,8 +1368,12 @@ std::unordered_map DAG auto lock = table_store->lockStructureForShare(context.getCurrentQueryId()); // check the columns in table_scan and table_store, to check whether we need to sync table schema. - auto [are_columns_matched, error_message] - = compareColumns(table_scan, table_store->getStoreColumnDefines(), dagContext(), log); + auto [are_columns_matched, error_message] = compareColumns( + table_scan.getLogicalTableID(), + table_scan.getColumns(), + table_store->getTableInfo().columns, + dagContext(), + log); if (are_columns_matched) { diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index b37a2984602..f1983a242df 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -123,7 +123,7 @@ std::tuple genColumnDefinesForDisaggregatedRead(const column_defines->emplace_back(DM::ColumnDefine{ column_info.id, output_name, - getDataTypeByColumnInfoForDisaggregatedStorageLayer(column_info), + getDataTypeByColumnInfo(column_info), column_info.defaultValueToField()}); break; } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp index c10d983e81b..e8e1722dca2 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -246,7 +246,7 @@ RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context) auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids); for (auto & rf : rfs) { - rf->setTargetAttr(column_infos, column_defines); + rf->setTargetAttr(column_infos, *column_defines); } return rfs; } diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 27d761f064c..9df7f17242f 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -123,7 +123,6 @@ class IManageableStorage : public IStorage const Context & context) = 0; - virtual DM::ColumnDefines getStoreColumnDefines() const = 0; /// Rename the table. /// /// Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately. diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 5e9bbd07643..1e508d1c09b 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -87,9 +87,9 @@ StorageDeltaMerge::StorageDeltaMerge( Timestamp tombstone, Context & global_context_) : IManageableStorage{columns_, tombstone} - , data_path_contains_database_name(db_engine != "TiFlash") , store_inited(false) , max_column_id_used(0) + , data_path_contains_database_name(db_engine != "TiFlash") , global_context(global_context_.getGlobalContext()) , log(Logger::get(fmt::format("{}.{}", db_name_, table_name_))) { @@ -1429,7 +1429,6 @@ void StorageDeltaMerge::alterSchemaChange( decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); - ColumnDefines store_columns = getStoreColumnDefines(); // after update `new_columns` and store's table columns, we need to update create table statement, // so that we can restore table next time. @@ -1444,32 +1443,6 @@ void StorageDeltaMerge::alterSchemaChange( context); } -ColumnDefines StorageDeltaMerge::getStoreColumnDefines() const -{ - if (storeInited()) - { - return _store->getTableColumns(); - } - std::lock_guard lock(store_mutex); - if (storeInited()) - { - return _store->getTableColumns(); - } - ColumnDefines cols; - cols.emplace_back(table_column_info->handle_column_define); - cols.emplace_back(getVersionColumnDefine()); - cols.emplace_back(getTagColumnDefine()); - for (const auto & col : table_column_info->table_column_defines) - { - if (col.id != table_column_info->handle_column_define.id && col.id != MutSup::version_col_id - && col.id != MutSup::delmark_col_id) - { - cols.emplace_back(col); - } - } - return cols; -} - String StorageDeltaMerge::getName() const { return MutSup::delta_tree_storage_name; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index de4340459e4..ac4958230f6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -251,7 +251,6 @@ class StorageDeltaMerge void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); - DM::ColumnDefines getStoreColumnDefines() const override; bool dataDirExist(); void shutdownImpl(); @@ -273,7 +272,6 @@ class StorageDeltaMerge DM::ColumnDefines table_column_defines; DM::ColumnDefine handle_column_define; }; - const bool data_path_contains_database_name = false; mutable std::mutex store_mutex; @@ -281,9 +279,6 @@ class StorageDeltaMerge std::atomic store_inited; DM::DeltaMergeStorePtr _store; // NOLINT(readability-identifier-naming) - Strings pk_column_names; // TODO: remove it. Only use for debug from ch-client. - bool is_common_handle = false; - bool pk_is_handle = false; size_t rowkey_column_size = 0; /// The user-defined PK column. If multi-column PK, or no PK, it is 0. /// Note that user-defined PK will never be _tidb_rowid. @@ -295,8 +290,6 @@ class StorageDeltaMerge mutable std::mutex decode_schema_mutex; DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot; - // The following two members must be used under the protection of table structure lock - bool decoding_schema_changed = false; // internal epoch for `decoding_schema_snapshot` Int64 decoding_schema_epoch = 1; @@ -310,7 +303,17 @@ class StorageDeltaMerge std::atomic shutdown_called{false}; - std::atomic next_version = 1; //TODO: remove this!!! + // TODO: remove the following two members, which are only used for debug from ch-client. + Strings pk_column_names; + std::atomic next_version = 1; + + bool is_common_handle = false; + bool pk_is_handle = false; + + // The following two members must be used under the protection of table structure lock + bool decoding_schema_changed = false; + + const bool data_path_contains_database_name = false; Context & global_context; diff --git a/dbms/src/TiDB/Decode/TypeMapping.cpp b/dbms/src/TiDB/Decode/TypeMapping.cpp index 705c75e10a4..8ca1c42acb0 100644 --- a/dbms/src/TiDB/Decode/TypeMapping.cpp +++ b/dbms/src/TiDB/Decode/TypeMapping.cpp @@ -230,16 +230,6 @@ DataTypePtr getDataTypeByColumnInfoForComputingLayer(const ColumnInfo & column_i return base; } -DataTypePtr getDataTypeByColumnInfoForDisaggregatedStorageLayer(const ColumnInfo & column_info) -{ - DataTypePtr base = TypeMapping::instance().getDataType(column_info); - if (!column_info.hasNotNullFlag()) - { - return std::make_shared(base); - } - return base; -} - DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type) { ColumnInfo ci = TiDB::fieldTypeToColumnInfo(field_type); diff --git a/dbms/src/TiDB/Decode/TypeMapping.h b/dbms/src/TiDB/Decode/TypeMapping.h index b9a04f33864..f0e30d13d27 100644 --- a/dbms/src/TiDB/Decode/TypeMapping.h +++ b/dbms/src/TiDB/Decode/TypeMapping.h @@ -37,8 +37,6 @@ DataTypePtr getDataTypeByColumnInfoForComputingLayer(const TiDB::ColumnInfo & co DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type); DataTypePtr getDataTypeByFieldTypeForComputingLayer(const tipb::FieldType & field_type); -DataTypePtr getDataTypeByColumnInfoForDisaggregatedStorageLayer(const TiDB::ColumnInfo & column_info); - TiDB::CodecFlag getCodecFlagByFieldType(const tipb::FieldType & field_type); // Try best to reverse get TiDB's column info from TiFlash info. From 80065f2db6b6065811da5a9bfccc1cf82aad1093 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 20 Jan 2025 16:09:58 +0800 Subject: [PATCH 2/3] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 7da5c1877c5..c43edbc8d8c 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1498,7 +1498,7 @@ std::unordered_map DAG return true; } - LOG_DEBUG(log, "not OK, syncing schemas for keyspace={} table_ids={}", keyspace_id, need_sync_table_ids); + LOG_INFO(log, "not OK, syncing schemas for keyspace={} table_ids={}", keyspace_id, need_sync_table_ids); auto start_time = Clock::now(); for (auto & table_id : need_sync_table_ids) From 31fb41a174771496771a5768075483fcc7b77e95 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 20 Jan 2025 16:10:51 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Storages/StorageDeltaMerge.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index ac4958230f6..51c9213bdfa 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -310,7 +310,7 @@ class StorageDeltaMerge bool is_common_handle = false; bool pk_is_handle = false; - // The following two members must be used under the protection of table structure lock + // `decoding_schema_changed` and `decoding_schema_epoch` must be used under the protection of table structure lock bool decoding_schema_changed = false; const bool data_path_contains_database_name = false;