Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public boolean isFinalState() {
}

public enum JobType {
ROLLUP, SCHEMA_CHANGE, DECOMMISSION_BACKEND
ROLLUP, SCHEMA_CHANGE
}

@SerializedName(value = "type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class SchemaChangeHandler extends AlterHandler {

public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap();

public int cycle_count = 0;
public int cycleCount = 0;

public SchemaChangeHandler() {
super("schema change", Config.default_schema_change_scheduler_interval_millisecond);
Expand Down Expand Up @@ -1391,13 +1391,13 @@ private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Colu

@Override
protected void runAfterCatalogReady() {
if (cycle_count >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
clearFinishedOrCancelledSchemaChangeJobV2();
super.runAfterCatalogReady();
cycle_count = 0;
cycleCount = 0;
}
runAlterJobV2();
cycle_count++;
cycleCount++;
}

private void runAlterJobV2() {
Expand Down Expand Up @@ -1868,12 +1868,11 @@ private boolean processDropIndex(DropIndexClause alterClause, OlapTable olapTabl
}

@Override
protected void addAlterJobV2(AlterJobV2 alterJob) {
public void addAlterJobV2(AlterJobV2 alterJob) {
super.addAlterJobV2(alterJob);
runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
}


private void clearFinishedOrCancelledSchemaChangeJobV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator();
while (iterator.hasNext()) {
Expand Down
35 changes: 19 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1815,8 +1815,6 @@ public long loadAlterJob(DataInputStream dis, long checksum) throws IOException
}

public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throws IOException {
Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();

// alter jobs
int size = dis.readInt();
long newChecksum = checksum ^ size;
Expand All @@ -1825,13 +1823,11 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw
throw new IOException("There are [" + size + "] old alter jobs. Please downgrade FE to an older version and handle residual jobs");
}

if (Catalog.getCurrentCatalogJournalVersion() >= 2) {
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs");
}
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled alter jobs. Please downgrade FE to an older version and handle residual jobs");
}

// alter job v2
Expand All @@ -1841,9 +1837,9 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw
AlterJobV2 alterJobV2 = AlterJobV2.read(dis);
if (type == JobType.ROLLUP || type == JobType.SCHEMA_CHANGE) {
if (type == JobType.ROLLUP) {
this.getRollupHandler().addAlterJobV2(alterJobV2);
this.getMaterializedViewHandler().addAlterJobV2(alterJobV2);
} else {
alterJobsV2.put(alterJobV2.getJobId(), alterJobV2);
this.getSchemaChangeHandler().addAlterJobV2(alterJobV2);
}
// ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpointed
// to prevent TabletInvertedIndex data loss,
Expand All @@ -1853,7 +1849,7 @@ public long loadAlterJob(DataInputStream dis, long checksum, JobType type) throw
LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId());
}
} else {
alterJobsV2.put(alterJobV2.getJobId(), alterJobV2);
throw new IOException("Invalid alter job type: " + type.name());
}
}

Expand Down Expand Up @@ -2112,7 +2108,14 @@ public long saveAlterJob(CountingDataOutputStream dos, long checksum) throws IOE
}

public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType type) throws IOException {
Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
Map<Long, AlterJobV2> alterJobsV2;
if (type == JobType.ROLLUP) {
alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2();
} else if (type == JobType.SCHEMA_CHANGE) {
alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
} else {
throw new IOException("Invalid alter job type: " + type.name());
}

// alter jobs == 0
// If the FE version upgrade from old version, if it have alter jobs, the FE will failed during start process
Expand Down Expand Up @@ -3814,7 +3817,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep
}
Preconditions.checkNotNull(rollupIndexStorageType);
// set rollup index meta to olap table
List<Column> rollupColumns = getRollupHandler().checkAndPrepareMaterializedView(addRollupClause,
List<Column> rollupColumns = getMaterializedViewHandler().checkAndPrepareMaterializedView(addRollupClause,
olapTable, baseRollupIndex, false);
short rollupShortKeyColumnCount = Catalog.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
int rollupSchemaHash = Util.generateSchemaHash();
Expand Down Expand Up @@ -5032,7 +5035,7 @@ public SchemaChangeHandler getSchemaChangeHandler() {
return (SchemaChangeHandler) this.alter.getSchemaChangeHandler();
}

public MaterializedViewHandler getRollupHandler() {
public MaterializedViewHandler getMaterializedViewHandler() {
return (MaterializedViewHandler) this.alter.getMaterializedViewHandler();
}

Expand Down Expand Up @@ -5310,7 +5313,7 @@ public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlExcept
*/
public void cancelAlter(CancelAlterTableStmt stmt) throws DdlException {
if (stmt.getAlterType() == AlterType.ROLLUP) {
this.getRollupHandler().cancel(stmt);
this.getMaterializedViewHandler().cancel(stmt);
} else if (stmt.getAlterType() == AlterType.COLUMN) {
this.getSchemaChangeHandler().cancel(stmt);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ProcNodeInterface lookup(String jobTypeName) throws AnalysisException {
} else if (jobTypeName.equals(DELETE)) {
return new DeleteInfoProcDir(catalog.getDeleteHandler(), catalog.getLoadInstance(), db.getId());
} else if (jobTypeName.equals(ROLLUP)) {
return new RollupProcDir(catalog.getRollupHandler(), db);
return new RollupProcDir(catalog.getMaterializedViewHandler(), db);
} else if (jobTypeName.equals(SCHEMA_CHANGE)) {
return new SchemaChangeProcDir(catalog.getSchemaChangeHandler(), db);
} else if (jobTypeName.equals(EXPORT)) {
Expand Down Expand Up @@ -119,7 +119,7 @@ public ProcResult fetchResult() throws AnalysisException {
cancelledNum.toString(), totalNum.toString()));

// rollup
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getRollupHandler();
MaterializedViewHandler materializedViewHandler = Catalog.getCurrentCatalog().getMaterializedViewHandler();
pendingNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.PENDING, dbId);
runningNum = materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.WAITING_TXN, dbId)
+ materializedViewHandler.getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState.RUNNING, dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Preconditions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import com.google.common.base.Preconditions;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -752,7 +752,7 @@ private void finishAlterTask(AgentTask task) {
AlterReplicaTask alterTask = (AlterReplicaTask) task;
try {
if (alterTask.getJobType() == JobType.ROLLUP) {
Catalog.getCurrentCatalog().getRollupHandler().handleFinishAlterTask(alterTask);
Catalog.getCurrentCatalog().getMaterializedViewHandler().handleFinishAlterTask(alterTask);
} else if (alterTask.getJobType() == JobType.SCHEMA_CHANGE) {
Catalog.getCurrentCatalog().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
}
Expand Down
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
}
case OperationType.OP_DROP_ROLLUP: {
DropInfo info = (DropInfo) journal.getData();
catalog.getRollupHandler().replayDropRollup(info, catalog);
catalog.getMaterializedViewHandler().replayDropRollup(info, catalog);
break;
}
case OperationType.OP_BATCH_DROP_ROLLUP: {
BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData();
for (long indexId : batchDropInfo.getIndexIdSet()) {
catalog.getRollupHandler().replayDropRollup(
catalog.getMaterializedViewHandler().replayDropRollup(
new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), indexId, false), catalog);
}
break;
Expand Down Expand Up @@ -701,7 +701,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
AlterJobV2 alterJob = (AlterJobV2) journal.getData();
switch (alterJob.getType()) {
case ROLLUP:
catalog.getRollupHandler().replayAlterJobV2(alterJob);
catalog.getMaterializedViewHandler().replayAlterJobV2(alterJob);
break;
case SCHEMA_CHANGE:
catalog.getSchemaChangeHandler().replayAlterJobV2(alterJob);
Expand All @@ -714,7 +714,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
case OperationType.OP_BATCH_ADD_ROLLUP: {
BatchAlterJobPersistInfo batchAlterJobV2 = (BatchAlterJobPersistInfo) journal.getData();
for (AlterJobV2 alterJobV2 : batchAlterJobV2.getAlterJobV2List()) {
catalog.getRollupHandler().replayAlterJobV2(alterJobV2);
catalog.getMaterializedViewHandler().replayAlterJobV2(alterJobV2);
}
break;
}
Expand Down Expand Up @@ -759,7 +759,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
RemoveAlterJobV2OperationLog log = (RemoveAlterJobV2OperationLog) journal.getData();
switch (log.getType()) {
case ROLLUP:
catalog.getRollupHandler().replayRemoveAlterJobV2(log);
catalog.getMaterializedViewHandler().replayRemoveAlterJobV2(log);
break;
case SCHEMA_CHANGE:
catalog.getSchemaChangeHandler().replayRemoveAlterJobV2(log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testRollup() throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 2. check alter job
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
// 3. check show alter table column
String showAlterStmtStr = "show alter table rollup from test;";
Expand All @@ -157,7 +157,7 @@ public void testAlterSegmentV2() throws Exception {
String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);

String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
Expand All @@ -168,7 +168,7 @@ public void testAlterSegmentV2() throws Exception {
alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);

explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testDupTableSchemaChange() throws Exception {


alterTable("alter table test.dup_table add rollup r1(v1,v2,k2,k1);");
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
ExceptionChecker.expectThrowsNoException(() -> alterTable("alter table test.dup_table modify column v2 varchar(2);"));
}
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testCreateMVForListPartitionTable() throws Exception {
"city,\n" +
"user_id,\n" +
"date;");
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ public void testDynamicPartitionDropAndAdd() throws Exception {
private void waitSchemaChangeJobDone(boolean rollupJob) throws Exception {
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
if (rollupJob) {
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
}
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void setup() throws Exception {

@Before
public void before() throws Exception {
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
alterJobs.clear();

// create database db1
Expand All @@ -77,7 +77,7 @@ public void testBatchRollup() throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);

Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
Assert.assertEquals(3, alterJobs.size());

Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:db1");
Expand Down Expand Up @@ -127,7 +127,7 @@ public void testCancelBatchRollup() throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, ctx);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);

Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
Assert.assertEquals(3, alterJobs.size());
List<Long> jobIds = Lists.newArrayList(alterJobs.keySet());

Expand Down
Loading