Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.doris.insertoverwrite;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -40,7 +44,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class InsertOverwriteManager extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class);
Expand All @@ -62,6 +68,11 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();

// TableId running insert overwrite
// dbId ==> Set<tableId>
private Map<Long, Set<Long>> runningTables = Maps.newHashMap();
private ReentrantReadWriteLock runningLock = new ReentrantReadWriteLock(true);

public InsertOverwriteManager() {
super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000);
}
Expand Down Expand Up @@ -270,6 +281,53 @@ private boolean rollback(long taskId) {
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());
}

/**
* If the current table id has a running insert overwrite, throw an exception.
* If not, record it in runningTables
*
* @param db Run the db for insert overwrite
* @param table Run the table for insert overwrite
*/
public void recordRunningTableOrException(DatabaseIf db, TableIf table) {
long dbId = db.getId();
long tableId = table.getId();
runningLock.writeLock().lock();
try {
if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) {
throw new AnalysisException(
String.format("Not allowed running Insert Overwrite on same table: %s.%s", db.getFullName(),
table.getName()));
}
if (runningTables.containsKey(dbId)) {
runningTables.get(dbId).add(tableId);
} else {
runningTables.put(dbId, Sets.newHashSet(tableId));
}
} finally {
runningLock.writeLock().unlock();
}
}

/**
* Remove from running records
*
* @param dbId Run the dbId for insert overwrite
* @param tableId Run the tableId for insert overwrite
*/
public void dropRunningRecord(long dbId, long tableId) {
runningLock.writeLock().lock();
try {
if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) {
runningTables.get(dbId).remove(tableId);
if (runningTables.get(dbId).size() == 0) {
runningTables.remove(dbId);
}
}
} finally {
runningLock.writeLock().unlock();
}
}

/**
* replay logs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ private void exec(ConnectContext ctx, Set<String> refreshPartitionNames,
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (getStatus() == TaskStatus.CANCELED) {
// Throwing an exception to interrupt subsequent partition update tasks
throw new JobException("task is CANCELED");
}
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -60,11 +61,14 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* insert into select command implementation
Expand All @@ -81,6 +85,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
private LogicalPlan logicalQuery;
private Optional<String> labelName;
private final Optional<LogicalPlan> cte;
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isRunning = new AtomicBoolean(false);

/**
* constructor
Expand Down Expand Up @@ -157,35 +163,88 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), targetTable);
isRunning.set(true);
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement.
insertInto(ctx, executor, taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable);
insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable);
} else {
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before registerTask, queryId: {}",
ctx.getQueryIdentifier());
return;
}
taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before addTempPartitions, queryId: {}",
ctx.getQueryIdentifier());
// not need deal temp partition
insertOverwriteManager.taskSuccess(taskId);
return;
}
InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before insertInto, queryId: {}", ctx.getQueryIdentifier());
insertOverwriteManager.taskFail(taskId);
return;
}
insertInto(ctx, executor, tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before replacePartition, queryId: {}",
ctx.getQueryIdentifier());
insertOverwriteManager.taskFail(taskId);
return;
}
InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames);
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before taskSuccess, do nothing, queryId: {}",
ctx.getQueryIdentifier());
}
insertOverwriteManager.taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
insertOverwriteManager.taskGroupFail(taskId);
} else {
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
insertOverwriteManager.taskFail(taskId);
}
throw e;
} finally {
ConnectContext.get().setSkipAuth(false);
insertOverwriteManager
.dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId());
isRunning.set(false);
}
}

/**
* cancel insert overwrite
*/
public void cancel() {
this.isCancelled.set(true);
}

/**
* wait insert overwrite not running
*/
public void waitNotRunning() {
long waitMaxTimeSecond = 10L;
try {
Awaitility.await().atMost(waitMaxTimeSecond, TimeUnit.SECONDS).untilFalse(isRunning);
} catch (Exception e) {
LOG.warn("waiting time exceeds {} second, stop wait, labelName: {}", waitMaxTimeSecond,
labelName.isPresent() ? labelName.get() : "", e);
}
}

Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,11 @@ public void cancel() {
}
return;
}
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
if (insertOverwriteTableCommand.isPresent()) {
// If the be scheduling has not been triggered yet, cancel the scheduling first
insertOverwriteTableCommand.get().cancel();
}
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
Expand All @@ -1541,6 +1546,22 @@ public void cancel() {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
if (insertOverwriteTableCommand.isPresent()) {
// Wait for the command to run or cancel completion
insertOverwriteTableCommand.get().waitNotRunning();
}
}

private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
if (logicalPlan instanceof InsertOverwriteTableCommand) {
InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan;
return Optional.of(insertOverwriteTableCommand);
}
}
return Optional.empty();
}

// Because this is called by other thread
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.insertoverwrite;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;

import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class InsertOverwriteManagerTest {
@Mocked
private DatabaseIf db;

@Mocked
private TableIf table;

@Before
public void setUp()
throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException {

new Expectations() {
{
db.getId();
minTimes = 0;
result = 1L;

db.getFullName();
minTimes = 0;
result = "db1";

table.getId();
minTimes = 0;
result = 2L;

table.getName();
minTimes = 0;
result = "table1";
}
};
}

@Test
public void testParallel() {
InsertOverwriteManager manager = new InsertOverwriteManager();
manager.recordRunningTableOrException(db, table);
try {
manager.recordRunningTableOrException(db, table);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Not allowed"));
}
manager.dropRunningRecord(db.getId(), table.getId());
manager.recordRunningTableOrException(db, table);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,29 @@ class Suite implements GroovyInterceptable {
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
if (status != "SUCCESS") {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
}

void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also some similar methods that I can modify uniformly in other PRs

do {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED'))
if (status != "SUCCESS") {
logger.info("status is not success")
}
Expand Down
Loading