From b402384d776e46677fcef9a7bb977aaa6a369f53 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Sat, 10 Aug 2024 23:31:08 +0800 Subject: [PATCH 1/3] [fix](schema-change) Fix potential data race when a schema change jobs is set to cancelled but the table state is still SCHEMA_CHANGE (#39164) Set job cancel state after table state changed to normal. --- .../apache/doris/alter/SchemaChangeJobV2.java | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 86f0e24a62771e..ab0aee510bc021 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -198,8 +198,9 @@ private void pruneMeta() { /** * runPendingJob(): * 1. Create all replicas of all shadow indexes and wait them finished. - * 2. After creating done, add the shadow indexes to catalog, user can not see this - * shadow index, but internal load process will generate data for these indexes. + * 2. After creating done, add the shadow indexes to catalog, user can not see + * this + * shadow index, but internal load process will generate data for these indexes. * 3. Get a new transaction id, then set job's state to WAITING_TXN */ @Override @@ -482,7 +483,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { this.jobState = JobState.RUNNING; - // DO NOT write edit log here, tasks will be send again if FE restart or master changed. + // DO NOT write edit log here, tasks will be send again if FE restart or master + // changed. LOG.info("transfer schema change job {} state to {}", jobId, this.jobState); } @@ -490,7 +492,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { * runRunningJob() * 1. Wait all schema change tasks to be finished. * 2. Check the integrity of the newly created shadow indexes. - * 3. Replace the origin index with shadow index, and set shadow index's state as NORMAL to be visible to user. + * 3. Replace the origin index with shadow index, and set shadow index's state + * as NORMAL to be visible to user. * 4. Set job'state as FINISHED. */ @Override @@ -584,7 +587,7 @@ protected void runRunningJob() throws AlterCancelException { } // end for tablets } } // end for partitions - // all partitions are good + // all partitions are good onFinished(tbl); } finally { tbl.writeUnlock(); @@ -609,8 +612,10 @@ private void onFinished(OlapTable tbl) { long shadowIdxId = entry.getKey(); long originIdxId = entry.getValue(); // get index from catalog, not from 'partitionIdToRollupIndex'. - // because if this alter job is recovered from edit log, index in 'partitionIndexMap' - // is not the same object in catalog. So modification on that index can not reflect to the index + // because if this alter job is recovered from edit log, index in + // 'partitionIndexMap' + // is not the same object in catalog. So modification on that index can not + // reflect to the index // in catalog. MaterializedIndex shadowIdx = partition.getIndex(shadowIdxId); Preconditions.checkNotNull(shadowIdx, shadowIdxId); @@ -657,7 +662,7 @@ private void onFinished(OlapTable tbl) { tbl.getIndexMetaByIndexId(shadowIdxId).setMaxColUniqueId(maxColUniqueId); if (LOG.isDebugEnabled()) { LOG.debug("originIdxId:{}, shadowIdxId:{}, maxColUniqueId:{}, indexSchema:{}", - originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId)); + originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId)); } tbl.deleteIndexInfo(originIdxName); @@ -704,12 +709,12 @@ protected synchronized boolean cancelImpl(String errMsg) { pruneMeta(); this.errMsg = errMsg; this.finishedTimeMs = System.currentTimeMillis(); - LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); - Env.getCurrentEnv().getEditLog().logAlterJob(this); - changeTableState(dbId, tableId, OlapTableState.NORMAL); LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId); - + jobState = JobState.CANCELLED; + Env.getCurrentEnv().getEditLog().logAlterJob(this); + LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); + postProcessShadowIndex(); return true; } @@ -745,11 +750,10 @@ private void cancelInternal() { } } } - - jobState = JobState.CANCELLED; } - // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. + // Check whether transactions of the given database which txnId is less than + // 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() throws AnalysisException { return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( watershedTxnId, dbId, Lists.newArrayList(tableId)); @@ -808,7 +812,8 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFound olapTable.writeUnlock(); } - // should still be in WAITING_TXN state, so that the alter tasks will be resend again + // should still be in WAITING_TXN state, so that the alter tasks will be resend + // again this.jobState = JobState.WAITING_TXN; this.watershedTxnId = replayedJob.watershedTxnId; LOG.info("replay waiting txn schema change job: {} table id: {}", jobId, tableId); From 8033e543e583327593621979440639125b25bdf3 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:37:08 +0800 Subject: [PATCH 2/3] rm useless format --- .../apache/doris/alter/SchemaChangeJobV2.java | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index ab0aee510bc021..617f1375b759b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -198,9 +198,8 @@ private void pruneMeta() { /** * runPendingJob(): * 1. Create all replicas of all shadow indexes and wait them finished. - * 2. After creating done, add the shadow indexes to catalog, user can not see - * this - * shadow index, but internal load process will generate data for these indexes. + * 2. After creating done, add the shadow indexes to catalog, user can not see this + * shadow index, but internal load process will generate data for these indexes. * 3. Get a new transaction id, then set job's state to WAITING_TXN */ @Override @@ -483,8 +482,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { this.jobState = JobState.RUNNING; - // DO NOT write edit log here, tasks will be send again if FE restart or master - // changed. + // DO NOT write edit log here, tasks will be send again if FE restart or master changed. LOG.info("transfer schema change job {} state to {}", jobId, this.jobState); } @@ -492,8 +490,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { * runRunningJob() * 1. Wait all schema change tasks to be finished. * 2. Check the integrity of the newly created shadow indexes. - * 3. Replace the origin index with shadow index, and set shadow index's state - * as NORMAL to be visible to user. + * 3. Replace the origin index with shadow index, and set shadow index's state as NORMAL to be visible to user. * 4. Set job'state as FINISHED. */ @Override @@ -587,7 +584,7 @@ protected void runRunningJob() throws AlterCancelException { } // end for tablets } } // end for partitions - // all partitions are good + // all partitions are good onFinished(tbl); } finally { tbl.writeUnlock(); @@ -612,10 +609,8 @@ private void onFinished(OlapTable tbl) { long shadowIdxId = entry.getKey(); long originIdxId = entry.getValue(); // get index from catalog, not from 'partitionIdToRollupIndex'. - // because if this alter job is recovered from edit log, index in - // 'partitionIndexMap' - // is not the same object in catalog. So modification on that index can not - // reflect to the index + // because if this alter job is recovered from edit log, index in 'partitionIndexMap' + // is not the same object in catalog. So modification on that index can not reflect to the index // in catalog. MaterializedIndex shadowIdx = partition.getIndex(shadowIdxId); Preconditions.checkNotNull(shadowIdx, shadowIdxId); @@ -752,8 +747,7 @@ private void cancelInternal() { } } - // Check whether transactions of the given database which txnId is less than - // 'watershedTxnId' are finished. + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() throws AnalysisException { return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( watershedTxnId, dbId, Lists.newArrayList(tableId)); @@ -812,8 +806,7 @@ private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFound olapTable.writeUnlock(); } - // should still be in WAITING_TXN state, so that the alter tasks will be resend - // again + // should still be in WAITING_TXN state, so that the alter tasks will be resend again this.jobState = JobState.WAITING_TXN; this.watershedTxnId = replayedJob.watershedTxnId; LOG.info("replay waiting txn schema change job: {} table id: {}", jobId, tableId); From d5da0b6eadc0f95089b6af860b523ba0466c2e0a Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Thu, 15 Aug 2024 10:22:52 +0800 Subject: [PATCH 3/3] Update SchemaChangeJobV2.java --- .../src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 617f1375b759b1..e74754de7557bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -709,7 +709,6 @@ protected synchronized boolean cancelImpl(String errMsg) { jobState = JobState.CANCELLED; Env.getCurrentEnv().getEditLog().logAlterJob(this); LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); - postProcessShadowIndex(); return true; }