Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 0 additions & 221 deletions fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.alter;

import org.apache.doris.alter.AlterJob.JobState;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.catalog.Catalog;
Expand All @@ -28,7 +27,6 @@
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
Expand All @@ -37,9 +35,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.thrift.TTabletInfo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand All @@ -51,20 +47,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;

public abstract class AlterHandler extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(AlterHandler.class);

// tableId -> AlterJob
@Deprecated
protected ConcurrentHashMap<Long, AlterJob> alterJobs = new ConcurrentHashMap<>();
@Deprecated
protected ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = new ConcurrentLinkedQueue<>();

// queue of alter job v2
protected ConcurrentMap<Long, AlterJobV2> alterJobsV2 = Maps.newConcurrentMap();

Expand Down Expand Up @@ -123,22 +111,6 @@ public Map<Long, AlterJobV2> getAlterJobsV2() {
return this.alterJobsV2;
}

// should be removed in version 0.13
@Deprecated
private void clearExpireFinishedOrCancelledAlterJobs() {
long curTime = System.currentTimeMillis();
// clean history job
Iterator<AlterJob> iter = finishedOrCancelledAlterJobs.iterator();
while (iter.hasNext()) {
AlterJob historyJob = iter.next();
if ((curTime - historyJob.getCreateTimeMs()) / 1000 > Config.history_job_keep_max_second) {
iter.remove();
LOG.info("remove history {} job[{}]. finish at {}", historyJob.getType(),
historyJob.getTableId(), TimeUtils.longToTimeString(historyJob.getFinishedTime()));
}
}
}

private void clearExpireFinishedOrCancelledAlterJobsV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = alterJobsV2.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -162,61 +134,6 @@ public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
}
}

@Deprecated
protected void addAlterJob(AlterJob alterJob) {
this.alterJobs.put(alterJob.getTableId(), alterJob);
LOG.info("add {} job[{}]", alterJob.getType(), alterJob.getTableId());
}

@Deprecated
public AlterJob getAlterJob(long tableId) {
return this.alterJobs.get(tableId);
}

@Deprecated
public boolean hasUnfinishedAlterJob(long tableId) {
return this.alterJobs.containsKey(tableId);
}

@Deprecated
public int getAlterJobNum(JobState state, long dbId) {
int jobNum = 0;
if (state == JobState.PENDING || state == JobState.RUNNING || state == JobState.FINISHING) {
for (AlterJob alterJob : alterJobs.values()) {
if (alterJob.getState() == state && alterJob.getDbId() == dbId) {
++jobNum;
}
}
} else if (state == JobState.FINISHED) {
// lock to perform atomically
lock();
try {
for (AlterJob alterJob : alterJobs.values()) {
if (alterJob.getState() == JobState.FINISHED && alterJob.getDbId() == dbId) {
++jobNum;
}
}

for (AlterJob alterJob : finishedOrCancelledAlterJobs) {
if (alterJob.getState() == JobState.FINISHED && alterJob.getDbId() == dbId) {
++jobNum;
}
}
} finally {
unlock();
}

} else if (state == JobState.CANCELLED) {
for (AlterJob alterJob : finishedOrCancelledAlterJobs) {
if (alterJob.getState() == JobState.CANCELLED && alterJob.getDbId() == dbId) {
++jobNum;
}
}
}

return jobNum;
}

public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state, long dbId) {
return alterJobsV2.values().stream().filter(e -> e.getJobState() == state && e.getDbId() == dbId).count();
}
Expand All @@ -225,135 +142,8 @@ public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) {
return alterJobsV2.values().stream().filter(e -> e.getJobState() == state).count();
}

@Deprecated
public Map<Long, AlterJob> unprotectedGetAlterJobs() {
return this.alterJobs;
}

@Deprecated
public ConcurrentLinkedQueue<AlterJob> unprotectedGetFinishedOrCancelledAlterJobs() {
return this.finishedOrCancelledAlterJobs;
}

@Deprecated
public void addFinishedOrCancelledAlterJob(AlterJob alterJob) {
alterJob.clear();
LOG.info("add {} job[{}] to finished or cancel list", alterJob.getType(), alterJob.getTableId());
this.finishedOrCancelledAlterJobs.add(alterJob);
}

@Deprecated
protected AlterJob removeAlterJob(long tableId) {
return this.alterJobs.remove(tableId);
}

@Deprecated
public void removeDbAlterJob(long dbId) {
Iterator<Map.Entry<Long, AlterJob>> iterator = alterJobs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, AlterJob> entry = iterator.next();
AlterJob alterJob = entry.getValue();
if (alterJob.getDbId() == dbId) {
iterator.remove();
}
}
}

/*
* handle task report
* reportVersion is used in schema change job.
*/
@Deprecated
public void handleFinishedReplica(AgentTask task, TTabletInfo finishTabletInfo, long reportVersion)
throws MetaNotFoundException {
long tableId = task.getTableId();

AlterJob alterJob = getAlterJob(tableId);
if (alterJob == null) {
throw new MetaNotFoundException("Cannot find " + task.getTaskType().name() + " job[" + tableId + "]");
}
alterJob.handleFinishedReplica(task, finishTabletInfo, reportVersion);
}

protected void cancelInternal(AlterJob alterJob, OlapTable olapTable, String msg) {
// cancel
if (olapTable != null) {
olapTable.writeLock();
}
try {
alterJob.cancel(olapTable, msg);
} finally {
if (olapTable != null) {
olapTable.writeUnlock();
}
}
jobDone(alterJob);
}

protected void jobDone(AlterJob alterJob) {
lock();
try {
// remove job
AlterJob alterJobRemoved = removeAlterJob(alterJob.getTableId());
// add to finishedOrCancelledAlterJobs
if (alterJobRemoved != null) {
// add alterJob not alterJobRemoved, because the alterJob maybe a new object
// deserialized from journal, and the finished state is set to the new object
addFinishedOrCancelledAlterJob(alterJob);
}
} finally {
unlock();
}
}

public void replayInitJob(AlterJob alterJob, Catalog catalog) throws MetaNotFoundException {
try {
Database db = catalog.getDbOrMetaException(alterJob.getDbId());
alterJob.replayInitJob(db);
} finally {
// add rollup job
addAlterJob(alterJob);
}
}

public void replayFinishing(AlterJob alterJob, Catalog catalog) throws MetaNotFoundException {
try {
Database db = catalog.getDbOrMetaException(alterJob.getDbId());
alterJob.replayFinishing(db);
} finally {
alterJob.setState(JobState.FINISHING);
// !!! the alter job should add to the cache again, because the alter job is deserialized from journal
// it is a different object compared to the cache
addAlterJob(alterJob);
}
}

public void replayFinish(AlterJob alterJob, Catalog catalog) throws MetaNotFoundException {
try {
Database db = catalog.getDbOrMetaException(alterJob.getDbId());
alterJob.replayFinish(db);
} finally {
alterJob.setState(JobState.FINISHED);
jobDone(alterJob);
}
}

public void replayCancel(AlterJob alterJob, Catalog catalog) throws MetaNotFoundException {
removeAlterJob(alterJob.getTableId());
alterJob.setState(JobState.CANCELLED);
try {
// we log rollup job cancelled even if db is dropped.
// so check db != null here
Database db = catalog.getDbOrMetaException(alterJob.getDbId());
alterJob.replayCancel(db);
} finally {
addFinishedOrCancelledAlterJob(alterJob);
}
}

@Override
protected void runAfterCatalogReady() {
clearExpireFinishedOrCancelledAlterJobs();
clearExpireFinishedOrCancelledAlterJobsV2();
}

Expand Down Expand Up @@ -387,17 +177,6 @@ public void processExternalTable(List<AlterClause> alterClauses, Database db, Ta
*/
public abstract void cancel(CancelStmt stmt) throws DdlException;

@Deprecated
public Integer getAlterJobNumByState(JobState state) {
int jobNum = 0;
for (AlterJob alterJob : alterJobs.values()) {
if (alterJob.getState() == state) {
++jobNum;
}
}
return jobNum;
}

/*
* Handle the finish report of alter task.
* If task is success, which means the history data before specified version has been transformed successfully.
Expand Down
Loading