Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,6 @@ nonterminal Map<String, String> key_value_map, opt_key_value_map, opt_key_value_
opt_ext_properties, opt_enable_feature_properties, properties;
nonterminal ColumnDef column_definition;
nonterminal IndexDef index_definition;
nonterminal IndexDef build_index_definition;
nonterminal ArrayList<ColumnDef> column_definition_list;
nonterminal ArrayList<IndexDef> index_definition_list;
nonterminal AggregateType opt_agg_type;
Expand Down Expand Up @@ -2070,7 +2069,7 @@ create_stmt ::=
:}
| KW_BUILD KW_INDEX ident:indexName KW_ON table_name:tableName opt_partition_names:partitionNames
{:
RESULT = new AlterTableStmt(tableName, Lists.newArrayList(new BuildIndexClause(tableName, new IndexDef(indexName, partitionNames, true), false)));
RESULT = new AlterTableStmt(tableName, Lists.newArrayList(new BuildIndexClause(tableName, indexName, partitionNames, false)));
:}
/* stage */
| KW_CREATE KW_STAGE opt_if_not_exists:ifNotExists ident:stageName KW_PROPERTIES opt_key_value_map:properties
Expand Down Expand Up @@ -4043,13 +4042,6 @@ index_definition ::=
:}
;

build_index_definition ::=
KW_INDEX ident:indexName opt_partition_names:partitionNames
{:
RESULT = new IndexDef(indexName, partitionNames, true);
:}
;

opt_nullable_type ::=
{:
RESULT = ColumnNullableType.DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,12 @@ private void checkAssignedTargetIndexName(String baseIndexName, String targetInd
}

private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
Map<String, String> propertyMap, List<Index> indexes) throws UserException {
Map<String, String> propertyMap, List<Index> indexes,
boolean isBuildIndex) throws UserException {
if (isBuildIndex) {
// remove the index which is not the base index, only base index can be built index
indexSchemaMap.entrySet().removeIf(entry -> !entry.getKey().equals(olapTable.getBaseIndexId()));
}
checkReplicaCount(olapTable);

// process properties first
Expand Down Expand Up @@ -1295,7 +1300,7 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long,
boolean hasIndexChange = false;
Set<Index> newSet = new HashSet<>(indexes);
Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
if (!newSet.equals(oriSet)) {
if (!newSet.equals(oriSet) || isBuildIndex) {
hasIndexChange = true;
}

Expand All @@ -1311,7 +1316,7 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long,
throw new DdlException(e.getMessage());
}

// check bloom filter has change
// check bloom filter has been changed
boolean hasBfChange = false;
Set<String> oriBfColumns = olapTable.getCopiedBfColumns();
double oriBfFpp = olapTable.getBfFpp();
Expand Down Expand Up @@ -1947,7 +1952,7 @@ public int getAsInt() {

List<Index> newIndexes = olapTable.getCopiedIndexes();
List<Index> alterIndexes = new ArrayList<>();
Map<Long, Set<String>> invertedIndexOnPartitions = new HashMap<>();
Map<Long, Set<String>> indexOnPartitions = new HashMap<>();
boolean isDropIndex = false;
Map<String, String> propertyMap = new HashMap<>();
for (AlterClause alterClause : alterClauses) {
Expand Down Expand Up @@ -2085,68 +2090,29 @@ public int getAsInt() {
}
lightSchemaChange = false;

if (index.isLightIndexChangeSupported() && !Config.isCloudMode()) {
// ngram_bf index can do light_schema_change in both local and cloud mode
// inverted index can only do light_schema_change in local mode
if (index.isLightIndexChangeSupported()) {
alterIndexes.add(index);
isDropIndex = false;
// now only support light index change for inverted index
lightIndexChange = true;
}
} else if (alterClause instanceof BuildIndexClause) {
BuildIndexClause buildIndexClause = (BuildIndexClause) alterClause;
IndexDef indexDef = buildIndexClause.getIndexDef();
Index index = buildIndexClause.getIndex();
if (Config.isCloudMode()) {
if (Config.isCloudMode() && index.getIndexType() == IndexDef.IndexType.INVERTED) {
throw new DdlException("BUILD INDEX operation failed: No need to do it in cloud mode.");
}

if (!olapTable.isPartitionedTable()) {
List<String> specifiedPartitions = indexDef.getPartitionNames();
if (!specifiedPartitions.isEmpty()) {
throw new DdlException("table " + olapTable.getName()
+ " is not partitioned, cannot build index with partitions.");
}
}
List<Index> existedIndexes = olapTable.getIndexes();
boolean found = false;
for (Index existedIdx : existedIndexes) {
if (existedIdx.getIndexName().equalsIgnoreCase(indexDef.getIndexName())) {
found = true;
if (!existedIdx.isLightIndexChangeSupported()) {
throw new DdlException("BUILD INDEX operation failed: The index "
+ existedIdx.getIndexName() + " of type " + existedIdx.getIndexType()
+ " does not support lightweight index changes.");
}
for (Column column : olapTable.getBaseSchema()) {
if (!column.getType().isVariantType()) {
continue;
}
// variant type column can not support for building index
for (String indexColumn : existedIdx.getColumns()) {
if (column.getName().equalsIgnoreCase(indexColumn)) {
throw new DdlException("BUILD INDEX operation failed: The "
+ indexDef.getIndexName() + " index can not be built on the "
+ indexColumn + " column, because it is a variant type column.");
}
}
}
index = existedIdx.clone();
if (indexDef.getPartitionNames().isEmpty()) {
invertedIndexOnPartitions.put(index.getIndexId(), olapTable.getPartitionNames());
} else {
invertedIndexOnPartitions.put(
index.getIndexId(), new HashSet<>(indexDef.getPartitionNames()));
}
break;
}
}
if (!found) {
throw new DdlException("index " + indexDef.getIndexName()
+ " not exist, cannot build it with defferred.");
if (indexDef.getPartitionNames().isEmpty()) {
indexOnPartitions.put(index.getIndexId(), olapTable.getPartitionNames());
} else {
indexOnPartitions.put(
index.getIndexId(), new HashSet<>(indexDef.getPartitionNames()));
}

if (indexDef.isInvertedIndex()) {
alterIndexes.add(index);
}
alterIndexes.add(index);
buildIndexChange = true;
lightSchemaChange = false;
} else if (alterClause instanceof DropIndexClause) {
Expand All @@ -2164,7 +2130,9 @@ public int getAsInt() {
break;
}
}
if (found.isLightIndexChangeSupported() && !Config.isCloudMode()) {
// only inverted index with local mode can do light drop index change
if (found != null && found.getIndexType() == IndexDef.IndexType.INVERTED
&& Config.isNotCloudMode()) {
alterIndexes.add(found);
isDropIndex = true;
lightIndexChange = true;
Expand All @@ -2185,19 +2153,27 @@ public int getAsInt() {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop value column optimize, direct modify table meta.
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
null, isDropIndex, jobId, false);
null, isDropIndex, jobId, false, propertyMap);
} else if (Config.enable_light_index_change && lightIndexChange) {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop inverted index optimize, direct modify table meta firstly.
//for schema change add/drop inverted index and ngram_bf optimize, direct modify table meta firstly.
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, false);
alterIndexes, isDropIndex, jobId, false, propertyMap);
} else if (buildIndexChange) {
if (alterIndexes.isEmpty()) {
throw new DdlException("Altered index is empty. please check your alter stmt.");
}
IndexDef.IndexType indexType = alterIndexes.get(0).getIndexType();
if (Config.enable_light_index_change) {
buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
alterIndexes, invertedIndexOnPartitions, false);
if (indexType == IndexDef.IndexType.INVERTED) {
buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
alterIndexes, indexOnPartitions, false);
} else {
createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes, true);
}
}
} else {
createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes, false);
}
} finally {
olapTable.writeUnlock();
Expand Down Expand Up @@ -2693,6 +2669,8 @@ private void cancelIndexJob(CancelAlterTableStmt cancelAlterTableStmt) throws Dd
olapTable.writeUnlock();
}

// if this table has ngram_bf index, we must run cancel for schema change job
boolean hasNGramBFIndex = ((OlapTable) olapTable).hasIndexOfType(IndexDef.IndexType.NGRAM_BF);
// alter job v2's cancel must be called outside the table lock
if (jobList.size() > 0) {
for (IndexChangeJob job : jobList) {
Expand All @@ -2707,6 +2685,8 @@ private void cancelIndexJob(CancelAlterTableStmt cancelAlterTableStmt) throws Dd
LOG.info("cancel build index job {} on table {} success", jobId, tableName);
}
}
} else if (hasNGramBFIndex) {
cancelColumnJob(cancelAlterTableStmt);
} else {
throw new DdlException("No job to cancel for Table[" + tableName + "]");
}
Expand Down Expand Up @@ -2746,7 +2726,7 @@ private boolean processAddIndex(CreateIndexClause alterClause, OlapTable olapTab
Column column = olapTable.getColumn(col);
if (column != null) {
indexDef.checkColumn(column, olapTable.getKeysType(),
olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite(),
olapTable.getEnableUniqueKeyMergeOnWrite(),
olapTable.getInvertedIndexFileStorageFormat());
} else {
throw new DdlException("index column does not exist in table. invalid column: " + col);
Expand Down Expand Up @@ -2881,7 +2861,7 @@ public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
List<Index> alterIndexes, boolean isDropIndex,
long jobId, boolean isReplay)
long jobId, boolean isReplay, Map<String, String> propertyMap)
throws DdlException, AnalysisException {

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2960,7 +2940,7 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
}
try {
buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
alterIndexes, invertedIndexOnPartitions, true);
alterIndexes, invertedIndexOnPartitions, true);
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
Expand Down Expand Up @@ -3021,7 +3001,8 @@ public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId, true);
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId,
true, new HashMap<>());
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop or modify columns", e);
Expand Down Expand Up @@ -3161,7 +3142,7 @@ public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndi
olapTable.writeLock();
try {
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, true);
alterIndexes, isDropIndex, jobId, true, new HashMap<>());
} catch (UserException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop indexes", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
protected boolean hasRowStoreChange = false;

// save all schema change tasks
private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();

protected SchemaChangeJobV2() {
super(JobType.SCHEMA_CHANGE);
Expand Down Expand Up @@ -651,14 +651,13 @@ protected void runRunningJob() throws AlterCancelException {
healthyReplicaNum++;
}
}
if (!FeConstants.runningUnitTest) {
if (healthyReplicaNum < expectReplicationNum / 2 + 1) {
LOG.warn("shadow tablet {} has few healthy replicas: {}, schema change job: {}"
+ " healthyReplicaNum {} expectReplicationNum {}",
shadowTablet.getId(), replicas, jobId, healthyReplicaNum, expectReplicationNum);
throw new AlterCancelException(

if ((healthyReplicaNum < expectReplicationNum / 2 + 1) && !FeConstants.runningUnitTest) {
Copy link

Copilot AI Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that excluding test environments via 'FeConstants.runningUnitTest' does not mask potential issues with healthy replica counts in production.

Suggested change
if ((healthyReplicaNum < expectReplicationNum / 2 + 1) && !FeConstants.runningUnitTest) {
if (healthyReplicaNum < expectReplicationNum / 2 + 1) {

Copilot uses AI. Check for mistakes.
LOG.warn("shadow tablet {} has few healthy replicas: {}, schema change job: {}"
+ " healthyReplicaNum {} expectReplicationNum {}",
shadowTablet.getId(), replicas, jobId, healthyReplicaNum, expectReplicationNum);
throw new AlterCancelException(
"shadow tablet " + shadowTablet.getId() + " has few healthy replicas");
}
}
} // end for tablets
}
Expand Down
Loading
Loading