Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageDeltaMerge.h>
#include <TiDB/Decode/TypeMapping.h>

namespace DB
{
Expand Down Expand Up @@ -588,9 +589,9 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
return table_infos_for_delta_merge[name];
}

DM::ColumnDefines MockStorage::getStoreColumnDefines(Int64 table_id)
DM::ColumnDefinesPtr MockStorage::getStoreColumnDefines(Int64 table_id)
{
return storage_delta_merge_map[table_id]->getStoreColumnDefines();
return storage_delta_merge_map[table_id]->getStore()->getStoreColumns();
}

TiDB::ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class MockStorage

TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);
DM::ColumnDefines getStoreColumnDefines(Int64 table_id);
DM::ColumnDefinesPtr getStoreColumnDefines(Int64 table_id);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

Expand Down
35 changes: 18 additions & 17 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,46 +543,43 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
// compare the columns in table_scan with the columns in storages, to check if the current schema is satisified this query.
// column.name are always empty from table_scan, and column name is not necessary in read process, so we don't need compare the name here.
std::tuple<bool, String> compareColumns(
const TiDBTableScan & table_scan,
const DM::ColumnDefines & cur_columns,
ColumnID logical_table_id,
const TiDB::ColumnInfos & table_scan_columns,
const TiDB::ColumnInfos & cur_columns,
const DAGContext & dag_context,
const LoggerPtr & log)
{
const auto & columns = table_scan.getColumns();
std::unordered_map<ColumnID, DM::ColumnDefine> column_id_map;
std::unordered_map<ColumnID, const TiDB::ColumnInfo *> column_id_map;
for (const auto & column : cur_columns)
{
column_id_map[column.id] = column;
}
column_id_map[column.id] = &column;

for (const auto & column : columns)
for (const auto & column : table_scan_columns)
{
// Exclude virtual columns, including MutSup::extra_handle_id, MutSup::version_col_id,MutSup::delmark_col_id,MutSup::extra_table_id_col_id
if (column.id < 0)
{
continue;
}

auto iter = column_id_map.find(column.id);
if (iter == column_id_map.end())
{
String error_message = fmt::format(
"the column in the query is not found in current columns, keyspace={} table_id={} column_id={}",
dag_context.getKeyspaceID(),
table_scan.getLogicalTableID(),
logical_table_id,
column.id);
LOG_WARNING(log, error_message);
return std::make_tuple(false, error_message);
}

if (getDataTypeByColumnInfo(column)->getName() != iter->second.type->getName())
if (getDataTypeByColumnInfo(column)->getName() != getDataTypeByColumnInfo(*iter->second)->getName())
{
String error_message = fmt::format(
"the column data type in the query is not the same as the current column, keyspace={} table_id={} "
"column_id={} column_type={} query_column_type={}",
dag_context.getKeyspaceID(),
table_scan.getLogicalTableID(),
logical_table_id,
column.id,
iter->second.type->getName(),
getDataTypeByColumnInfo(*iter->second)->getName(),
getDataTypeByColumnInfo(column)->getName());
LOG_WARNING(log, error_message);
return std::make_tuple(false, error_message);
Expand Down Expand Up @@ -1371,8 +1368,12 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto lock = table_store->lockStructureForShare(context.getCurrentQueryId());

// check the columns in table_scan and table_store, to check whether we need to sync table schema.
auto [are_columns_matched, error_message]
= compareColumns(table_scan, table_store->getStoreColumnDefines(), dagContext(), log);
auto [are_columns_matched, error_message] = compareColumns(
table_scan.getLogicalTableID(),
table_scan.getColumns(),
table_store->getTableInfo().columns,
dagContext(),
log);

if (are_columns_matched)
{
Expand Down Expand Up @@ -1497,7 +1498,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return true;
}

LOG_DEBUG(log, "not OK, syncing schemas for keyspace={} table_ids={}", keyspace_id, need_sync_table_ids);
LOG_INFO(log, "not OK, syncing schemas for keyspace={} table_ids={}", keyspace_id, need_sync_table_ids);

auto start_time = Clock::now();
for (auto & table_id : need_sync_table_ids)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const
column_defines->emplace_back(DM::ColumnDefine{
column_info.id,
output_name,
getDataTypeByColumnInfoForDisaggregatedStorageLayer(column_info),
getDataTypeByColumnInfo(column_info),
column_info.defaultValueToField()});
break;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ RuntimeFilteList PhysicalMockTableScan::getRuntimeFilterList(Context & context)
auto rfs = context.getDAGContext()->runtime_filter_mgr.getLocalRuntimeFilterByIds(runtime_filter_ids);
for (auto & rf : rfs)
{
rf->setTargetAttr(column_infos, column_defines);
rf->setTargetAttr(column_infos, *column_defines);
}
return rfs;
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class IManageableStorage : public IStorage
const Context & context)
= 0;

virtual DM::ColumnDefines getStoreColumnDefines() const = 0;
/// Rename the table.
///
/// Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately.
Expand Down
29 changes: 1 addition & 28 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ StorageDeltaMerge::StorageDeltaMerge(
Timestamp tombstone,
Context & global_context_)
: IManageableStorage{columns_, tombstone}
, data_path_contains_database_name(db_engine != "TiFlash")
, store_inited(false)
, max_column_id_used(0)
, data_path_contains_database_name(db_engine != "TiFlash")
, global_context(global_context_.getGlobalContext())
, log(Logger::get(fmt::format("{}.{}", db_name_, table_name_)))
{
Expand Down Expand Up @@ -1429,7 +1429,6 @@ void StorageDeltaMerge::alterSchemaChange(
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
ColumnDefines store_columns = getStoreColumnDefines();

// after update `new_columns` and store's table columns, we need to update create table statement,
// so that we can restore table next time.
Expand All @@ -1444,32 +1443,6 @@ void StorageDeltaMerge::alterSchemaChange(
context);
}

ColumnDefines StorageDeltaMerge::getStoreColumnDefines() const
{
if (storeInited())
{
return _store->getTableColumns();
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getTableColumns();
}
ColumnDefines cols;
cols.emplace_back(table_column_info->handle_column_define);
cols.emplace_back(getVersionColumnDefine());
cols.emplace_back(getTagColumnDefine());
for (const auto & col : table_column_info->table_column_defines)
{
if (col.id != table_column_info->handle_column_define.id && col.id != MutSup::version_col_id
&& col.id != MutSup::delmark_col_id)
{
cols.emplace_back(col);
}
}
return cols;
}

String StorageDeltaMerge::getName() const
{
return MutSup::delta_tree_storage_name;
Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class StorageDeltaMerge

void updateTableColumnInfo();
ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info);
DM::ColumnDefines getStoreColumnDefines() const override;
bool dataDirExist();
void shutdownImpl();

Expand All @@ -273,17 +272,13 @@ class StorageDeltaMerge
DM::ColumnDefines table_column_defines;
DM::ColumnDefine handle_column_define;
};
const bool data_path_contains_database_name = false;

mutable std::mutex store_mutex;

std::unique_ptr<TableColumnInfo> table_column_info; // After create DeltaMergeStore object, it is deprecated.
std::atomic<bool> store_inited;
DM::DeltaMergeStorePtr _store; // NOLINT(readability-identifier-naming)

Strings pk_column_names; // TODO: remove it. Only use for debug from ch-client.
bool is_common_handle = false;
bool pk_is_handle = false;
size_t rowkey_column_size = 0;
/// The user-defined PK column. If multi-column PK, or no PK, it is 0.
/// Note that user-defined PK will never be _tidb_rowid.
Expand All @@ -295,8 +290,6 @@ class StorageDeltaMerge

mutable std::mutex decode_schema_mutex;
DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot;
// The following two members must be used under the protection of table structure lock
bool decoding_schema_changed = false;
// internal epoch for `decoding_schema_snapshot`
Int64 decoding_schema_epoch = 1;

Expand All @@ -310,7 +303,17 @@ class StorageDeltaMerge

std::atomic<bool> shutdown_called{false};

std::atomic<UInt64> next_version = 1; //TODO: remove this!!!
// TODO: remove the following two members, which are only used for debug from ch-client.
Strings pk_column_names;
std::atomic<UInt64> next_version = 1;

bool is_common_handle = false;
bool pk_is_handle = false;

// `decoding_schema_changed` and `decoding_schema_epoch` must be used under the protection of table structure lock
bool decoding_schema_changed = false;

const bool data_path_contains_database_name = false;

Context & global_context;

Expand Down
10 changes: 0 additions & 10 deletions dbms/src/TiDB/Decode/TypeMapping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,6 @@ DataTypePtr getDataTypeByColumnInfoForComputingLayer(const ColumnInfo & column_i
return base;
}

DataTypePtr getDataTypeByColumnInfoForDisaggregatedStorageLayer(const ColumnInfo & column_info)
{
DataTypePtr base = TypeMapping::instance().getDataType(column_info);
if (!column_info.hasNotNullFlag())
{
return std::make_shared<DataTypeNullable>(base);
}
return base;
}

DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type)
{
ColumnInfo ci = TiDB::fieldTypeToColumnInfo(field_type);
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/TiDB/Decode/TypeMapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ DataTypePtr getDataTypeByColumnInfoForComputingLayer(const TiDB::ColumnInfo & co
DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type);
DataTypePtr getDataTypeByFieldTypeForComputingLayer(const tipb::FieldType & field_type);

DataTypePtr getDataTypeByColumnInfoForDisaggregatedStorageLayer(const TiDB::ColumnInfo & column_info);

TiDB::CodecFlag getCodecFlagByFieldType(const tipb::FieldType & field_type);

// Try best to reverse get TiDB's column info from TiFlash info.
Expand Down