From f787bef37b4db26511f6d79dfe05e46e449a6cef Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 29 Apr 2022 23:47:28 +0800 Subject: [PATCH 1/3] [fix](alter-job) Missing alter job when doing checkpoint image --- .../org/apache/doris/alter/AlterJobV2.java | 2 +- .../doris/alter/SchemaChangeHandler.java | 11 +++---- .../org/apache/doris/catalog/Catalog.java | 31 ++++++++++--------- .../apache/doris/common/proc/JobsProcDir.java | 4 +-- .../org/apache/doris/master/MasterImpl.java | 6 ++-- .../org/apache/doris/persist/EditLog.java | 10 +++--- .../apache/doris/alter/AlterJobV2Test.java | 10 +++--- .../org/apache/doris/alter/AlterTest.java | 2 +- .../doris/alter/BatchRollupJobTest.java | 6 ++-- .../apache/doris/alter/RollupJobV2Test.java | 11 +++---- .../doris/catalog/CatalogOperationTest.java | 2 +- .../doris/catalog/TempPartitionTest.java | 6 ++-- .../org/apache/doris/utframe/DorisAssert.java | 2 +- 13 files changed, 51 insertions(+), 52 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 06fc79efc6eebe..764a0b0ae853bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -58,7 +58,7 @@ public boolean isFinalState() { } public enum JobType { - ROLLUP, SCHEMA_CHANGE, DECOMMISSION_BACKEND + ROLLUP, SCHEMA_CHANGE } @SerializedName(value = "type") diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 989135b3d7e402..d12382f926d875 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -118,7 +118,7 @@ public class SchemaChangeHandler extends AlterHandler { public final Map runnableSchemaChangeJobV2 = Maps.newConcurrentMap(); - public int cycle_count = 0; + public int cycleCount = 0; public SchemaChangeHandler() { super("schema change", Config.default_schema_change_scheduler_interval_millisecond); @@ -1391,13 +1391,13 @@ private void createJob(long dbId, OlapTable olapTable, Map= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) { + if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) { clearFinishedOrCancelledSchemaChangeJobV2(); super.runAfterCatalogReady(); - cycle_count = 0; + cycleCount = 0; } runAlterJobV2(); - cycle_count++; + cycleCount++; } private void runAlterJobV2() { @@ -1868,12 +1868,11 @@ private boolean processDropIndex(DropIndexClause alterClause, OlapTable olapTabl } @Override - protected void addAlterJobV2(AlterJobV2 alterJob) { + public void addAlterJobV2(AlterJobV2 alterJob) { super.addAlterJobV2(alterJob); runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob); } - private void clearFinishedOrCancelledSchemaChangeJobV2() { Iterator> iterator = runnableSchemaChangeJobV2.entrySet().iterator(); while (iterator.hasNext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d0cab71cad6bed..3bc6c0ff53e9be 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1815,8 +1815,6 @@ public long loadAlterJob(DataInputStream dis, long checksum) throws IOException } public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throws IOException { - Map alterJobsV2 = Maps.newHashMap(); - // alter jobs int size = dis.readInt(); long newChecksum = checksum ^ size; @@ -1825,13 +1823,11 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw throw new IOException("There are [" + size + "] old alter jobs. Please downgrade FE to an older version and handle residual jobs"); } - if (Catalog.getCurrentCatalogJournalVersion() >= 2) { - // finished or cancelled jobs - size = dis.readInt(); - newChecksum ^= size; - if (size > 0) { - throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs"); - } + // finished or cancelled jobs + size = dis.readInt(); + newChecksum ^= size; + if (size > 0) { + throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs"); } // alter job v2 @@ -1841,9 +1837,9 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw AlterJobV2 alterJobV2 = AlterJobV2.read(dis); if (type == JobType.ROLLUP || type == JobType.SCHEMA_CHANGE) { if (type == JobType.ROLLUP) { - this.getRollupHandler().addAlterJobV2(alterJobV2); + this.getMaterializedViewHandler().addAlterJobV2(alterJobV2); } else { - alterJobsV2.put(alterJobV2.getJobId(), alterJobV2); + this.getSchemaChangeHandler().addAlterJobV2(alterJobV2); } // ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpointed // to prevent TabletInvertedIndex data loss, @@ -1853,7 +1849,7 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId()); } } else { - alterJobsV2.put(alterJobV2.getJobId(), alterJobV2); + throw new IOException("Invalid alter job type: " + type.name()); } } @@ -2113,6 +2109,11 @@ public long saveAlterJob(CountingDataOutputStream dos, long checksum) throws IOE public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType type) throws IOException { Map alterJobsV2 = Maps.newHashMap(); + if (type == JobType.ROLLUP) { + alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2(); + } else if (type == JobType.SCHEMA_CHANGE) { + alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2(); + } // alter jobs == 0 // If the FE version upgrade from old version, if it have alter jobs, the FE will failed during start process @@ -3814,7 +3815,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep } Preconditions.checkNotNull(rollupIndexStorageType); // set rollup index meta to olap table - List rollupColumns = getRollupHandler().checkAndPrepareMaterializedView(addRollupClause, + List rollupColumns = getMaterializedViewHandler().checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false); short rollupShortKeyColumnCount = Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties()); int rollupSchemaHash = Util.generateSchemaHash(); @@ -5032,7 +5033,7 @@ public SchemaChangeHandler getSchemaChangeHandler() { return (SchemaChangeHandler) this.alter.getSchemaChangeHandler(); } - public MaterializedViewHandler getRollupHandler() { + public MaterializedViewHandler getMaterializedViewHandler() { return (MaterializedViewHandler) this.alter.getMaterializedViewHandler(); } @@ -5310,7 +5311,7 @@ public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlExcept */ public void cancelAlter(CancelAlterTableStmt stmt) throws DdlException { if (stmt.getAlterType() == AlterType.ROLLUP) { - this.getRollupHandler().cancel(stmt); + this.getMaterializedViewHandler().cancel(stmt); } else if (stmt.getAlterType() == AlterType.COLUMN) { this.getSchemaChangeHandler().cancel(stmt); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index 85e3b089c279b3..dfe272ff10b4bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -72,7 +72,7 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException { } else if (jobTypeName.equals(DELETE)) { return new DeleteInfoProcDir(catalog.getDeleteHandler(), catalog.getLoadInstance(), db.getId()); } else if (jobTypeName.equals(ROLLUP)) { - return new RollupProcDir(catalog.getRollupHandler(), db); + return new RollupProcDir(catalog.getMaterializedViewHandler(), db); } else if (jobTypeName.equals(SCHEMA_CHANGE)) { return new SchemaChangeProcDir(catalog.getSchemaChangeHandler(), db); } else if (jobTypeName.equals(EXPORT)) { @@ -119,7 +119,7 @@ public ProcResult fetchResult() throws AnalysisException { cancelledNum.toString(), totalNum.toString())); // rollup - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING, dbId); runningNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN, dbId) + materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING, dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index b9d1202c5b32b4..9d6107ed762583 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -61,12 +61,12 @@ import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTaskType; +import com.google.common.base.Preconditions; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import com.google.common.base.Preconditions; - import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -752,7 +752,7 @@ private void finishAlterTask(AgentTask task) { AlterReplicaTask alterTask = (AlterReplicaTask) task; try { if (alterTask.getJobType() == JobType.ROLLUP) { - Catalog.getCurrentCatalog().getRollupHandler().handleFinishAlterTask(alterTask); + Catalog.getCurrentCatalog().getMaterializedViewHandler().handleFinishAlterTask(alterTask); } else if (alterTask.getJobType() == JobType.SCHEMA_CHANGE) { Catalog.getCurrentCatalog().getSchemaChangeHandler().handleFinishAlterTask(alterTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index f68b5fc33a844e..9b8c174d42fdb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -275,13 +275,13 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { } case OperationType.OP_DROP_ROLLUP: { DropInfo info = (DropInfo) journal.getData(); - catalog.getRollupHandler().replayDropRollup(info, catalog); + catalog.getMaterializedViewHandler().replayDropRollup(info, catalog); break; } case OperationType.OP_BATCH_DROP_ROLLUP: { BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData(); for (long indexId : batchDropInfo.getIndexIdSet()) { - catalog.getRollupHandler().replayDropRollup( + catalog.getMaterializedViewHandler().replayDropRollup( new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), indexId, false), catalog); } break; @@ -701,7 +701,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { AlterJobV2 alterJob = (AlterJobV2) journal.getData(); switch (alterJob.getType()) { case ROLLUP: - catalog.getRollupHandler().replayAlterJobV2(alterJob); + catalog.getMaterializedViewHandler().replayAlterJobV2(alterJob); break; case SCHEMA_CHANGE: catalog.getSchemaChangeHandler().replayAlterJobV2(alterJob); @@ -714,7 +714,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { case OperationType.OP_BATCH_ADD_ROLLUP: { BatchAlterJobPersistInfo batchAlterJobV2 = (BatchAlterJobPersistInfo) journal.getData(); for (AlterJobV2 alterJobV2 : batchAlterJobV2.getAlterJobV2List()) { - catalog.getRollupHandler().replayAlterJobV2(alterJobV2); + catalog.getMaterializedViewHandler().replayAlterJobV2(alterJobV2); } break; } @@ -759,7 +759,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { RemoveAlterJobV2OperationLog log = (RemoveAlterJobV2OperationLog) journal.getData(); switch (log.getType()) { case ROLLUP: - catalog.getRollupHandler().replayRemoveAlterJobV2(log); + catalog.getMaterializedViewHandler().replayRemoveAlterJobV2(log); break; case SCHEMA_CHANGE: catalog.getSchemaChangeHandler().replayRemoveAlterJobV2(log); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java index 2bb1842b36f070..20f44dc60a23d7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java @@ -134,7 +134,7 @@ public void testRollup() throws Exception { AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); // 2. check alter job - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); waitAlterJobDone(alterJobs); // 3. check show alter table column String showAlterStmtStr = "show alter table rollup from test;"; @@ -157,7 +157,7 @@ public void testAlterSegmentV2() throws Exception { String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)"; AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); waitAlterJobDone(alterJobs); String sql = "select k2, sum(v1) from test.segmentv2 group by k2"; @@ -168,7 +168,7 @@ public void testAlterSegmentV2() throws Exception { alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')"; alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); - alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); waitAlterJobDone(alterJobs); explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql); @@ -219,7 +219,7 @@ public void testDupTableSchemaChange() throws Exception { alterTable("alter table test.dup_table add rollup r1(v1,v2,k2,k1);"); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); waitAlterJobDone(alterJobs); ExceptionChecker.expectThrowsNoException(() -> alterTable("alter table test.dup_table modify column v2 varchar(2);")); } @@ -253,7 +253,7 @@ public void testCreateMVForListPartitionTable() throws Exception { "city,\n" + "user_id,\n" + "date;"); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); waitAlterJobDone(alterJobs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index 74a18f7007a248..dc4c5c9f89c946 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -578,7 +578,7 @@ public void testDynamicPartitionDropAndAdd() throws Exception { private void waitSchemaChangeJobDone(boolean rollupJob) throws Exception { Map alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2(); if (rollupJob) { - alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); } for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java index b12077855a1ed7..61384768ea11ff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/BatchRollupJobTest.java @@ -55,7 +55,7 @@ public static void setup() throws Exception { @Before public void before() throws Exception { - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); alterJobs.clear(); // create database db1 @@ -77,7 +77,7 @@ public void testBatchRollup() throws Exception { AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); Assert.assertEquals(3, alterJobs.size()); Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:db1"); @@ -127,7 +127,7 @@ public void testCancelBatchRollup() throws Exception { AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); Assert.assertEquals(3, alterJobs.size()); List jobIds = Lists.newArrayList(alterJobs.keySet()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 2e00bdc54ac7a5..6e6b18536b16e0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -63,7 +63,6 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; @@ -150,7 +149,7 @@ public void testRunRollupJobConcurrentLimit() throws UserException { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); FakeCatalog.setCatalog(masterCatalog); - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); ArrayList alterClauses = new ArrayList<>(); alterClauses.add(clause); alterClauses.add(clause2); @@ -171,7 +170,7 @@ public void testAddSchemaChange() throws UserException { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); FakeCatalog.setCatalog(masterCatalog); - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); ArrayList alterClauses = new ArrayList<>(); alterClauses.add(clause); Database db = masterCatalog.getDbOrDdlException(CatalogTestUtil.testDbId1); @@ -188,7 +187,7 @@ public void testSchemaChange1() throws Exception { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); FakeCatalog.setCatalog(masterCatalog); - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); // add a rollup job ArrayList alterClauses = new ArrayList<>(); @@ -240,7 +239,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); FakeCatalog.setCatalog(masterCatalog); - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); // add a rollup job ArrayList alterClauses = new ArrayList<>(); @@ -378,7 +377,7 @@ public void testAddRollupForDupTable() throws UserException { fakeCatalog = new FakeCatalog(); fakeEditLog = new FakeEditLog(); FakeCatalog.setCatalog(masterCatalog); - MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler(); + MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler(); Database db = masterCatalog.getDbOrDdlException(CatalogTestUtil.testDbId1); OlapTable olapTable = (OlapTable) db.getTableOrDdlException(CatalogTestUtil.testTableId2); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogOperationTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogOperationTest.java index cf3135614a9ddd..ad215513777619 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogOperationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogOperationTest.java @@ -122,7 +122,7 @@ public void testRenameTable() throws Exception { String alterStmtStr = "alter table test.newNewTest add rollup r1(k1)"; alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext); Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); Assert.assertEquals(1, alterJobs.size()); for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java index ffd3528a56f346..b4c80e4dee2394 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java @@ -432,7 +432,7 @@ public void testForMultiPartitionTable() throws Exception { alterTable(stmtStr, true); // wait rollup finish - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { System.out.println( @@ -794,7 +794,7 @@ public void testForListPartitionTable() throws Exception { alterTable(stmtStr, true); // wait rollup finish - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { System.out.println( @@ -1132,7 +1132,7 @@ public void testForMultiListPartitionTable() throws Exception { alterTable(stmtStr, true); // wait rollup finish - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { System.out.println( diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java index eabaab95c8b57d..520352d654e20d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java @@ -148,7 +148,7 @@ public SessionVariable getSessionVariable() { private void checkAlterJob() throws InterruptedException { // check alter job - Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + Map alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2(); for (AlterJobV2 alterJobV2 : alterJobs.values()) { while (!alterJobV2.getJobState().isFinalState()) { System.out.println("alter job " + alterJobV2.getDbId() From 91efb6200ef4a8f518f9d1a05d5adde671da5d6f Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 30 Apr 2022 00:19:58 +0800 Subject: [PATCH 2/3] fix --- fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java | 2 ++ .../src/test/java/org/apache/doris/http/DorisHttpTestCase.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 3bc6c0ff53e9be..57f7e4978ab793 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2113,6 +2113,8 @@ public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType ty alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2(); } else if (type == JobType.SCHEMA_CHANGE) { alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2(); + } else { + throw new IOException("Invalid alter job type: " + type.name()); } // alter jobs == 0 diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index be5596250d5a35..07aba68c12fb4c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -340,7 +340,7 @@ SchemaChangeHandler getSchemaChangeHandler() { return new SchemaChangeHandler(); } @Mock - MaterializedViewHandler getRollupHandler() { + MaterializedViewHandler getMaterializedViewHandler() { return new MaterializedViewHandler(); } @Mock From 37f0c4c3834675826b84b81aa43511c46260b243 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 1 May 2022 18:27:34 +0800 Subject: [PATCH 3/3] fix --- fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 57f7e4978ab793..c82c3bf36bfad2 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2108,7 +2108,7 @@ public long saveAlterJob(CountingDataOutputStream dos, long checksum) throws IOE } public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType type) throws IOException { - Map alterJobsV2 = Maps.newHashMap(); + Map alterJobsV2; if (type == JobType.ROLLUP) { alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2(); } else if (type == JobType.SCHEMA_CHANGE) {