Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.doris.insertoverwrite;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -40,7 +44,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class InsertOverwriteManager extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class);
Expand All @@ -62,6 +68,11 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();

// TableId running insert overwrite
// dbId ==> Set<tableId>
private Map<Long, Set<Long>> runningTables = Maps.newHashMap();
private ReentrantReadWriteLock runningLock = new ReentrantReadWriteLock(true);

public InsertOverwriteManager() {
super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000);
}
Expand Down Expand Up @@ -270,6 +281,53 @@ private boolean rollback(long taskId) {
return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames());
}

/**
* If the current table id has a running insert overwrite, throw an exception.
* If not, record it in runningTables
*
* @param db Run the db for insert overwrite
* @param table Run the table for insert overwrite
*/
public void recordRunningTableOrException(DatabaseIf db, TableIf table) {
long dbId = db.getId();
long tableId = table.getId();
runningLock.writeLock().lock();
try {
if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) {
throw new AnalysisException(
String.format("Not allowed running Insert Overwrite on same table: %s.%s", db.getFullName(),
table.getName()));
}
if (runningTables.containsKey(dbId)) {
runningTables.get(dbId).add(tableId);
} else {
runningTables.put(dbId, Sets.newHashSet(tableId));
}
} finally {
runningLock.writeLock().unlock();
}
}

/**
* Remove from running records
*
* @param dbId Run the dbId for insert overwrite
* @param tableId Run the tableId for insert overwrite
*/
public void dropRunningRecord(long dbId, long tableId) {
runningLock.writeLock().lock();
try {
if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) {
runningTables.get(dbId).remove(tableId);
if (runningTables.get(dbId).size() == 0) {
runningTables.remove(dbId);
}
}
} finally {
runningLock.writeLock().unlock();
}
}

/**
* replay logs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ private void exec(ConnectContext ctx, Set<String> refreshPartitionNames,
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
if (getStatus() == TaskStatus.CANCELED) {
// Throwing an exception to interrupt subsequent partition update tasks
throw new JobException("task is CANCELED");
}
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -59,11 +60,14 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* insert into select command implementation
Expand All @@ -80,6 +84,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
private LogicalPlan logicalQuery;
private Optional<String> labelName;
private final Optional<LogicalPlan> cte;
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isRunning = new AtomicBoolean(false);

/**
* constructor
Expand Down Expand Up @@ -165,35 +171,88 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), targetTable);
isRunning.set(true);
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement.
insertInto(ctx, executor, taskId);
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable);
insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable);
} else {
List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
taskId = Env.getCurrentEnv().getInsertOverwriteManager()
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before registerTask, queryId: {}",
ctx.getQueryIdentifier());
return;
}
taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before addTempPartitions, queryId: {}",
ctx.getQueryIdentifier());
// not need deal temp partition
insertOverwriteManager.taskSuccess(taskId);
return;
}
InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before insertInto, queryId: {}", ctx.getQueryIdentifier());
insertOverwriteManager.taskFail(taskId);
return;
}
insertInto(ctx, executor, tempPartitionNames);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before replacePartition, queryId: {}",
ctx.getQueryIdentifier());
insertOverwriteManager.taskFail(taskId);
return;
}
InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames);
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
if (isCancelled.get()) {
LOG.info("insert overwrite is cancelled before taskSuccess, do nothing, queryId: {}",
ctx.getQueryIdentifier());
}
insertOverwriteManager.taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
insertOverwriteManager.taskGroupFail(taskId);
} else {
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
insertOverwriteManager.taskFail(taskId);
}
throw e;
} finally {
ConnectContext.get().setSkipAuth(false);
insertOverwriteManager
.dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId());
isRunning.set(false);
}
}

/**
* cancel insert overwrite
*/
public void cancel() {
this.isCancelled.set(true);
}

/**
* wait insert overwrite not running
*/
public void waitNotRunning() {
long waitMaxTimeSecond = 10L;
try {
Awaitility.await().atMost(waitMaxTimeSecond, TimeUnit.SECONDS).untilFalse(isRunning);
} catch (Exception e) {
LOG.warn("waiting time exceeds {} second, stop wait, labelName: {}", waitMaxTimeSecond,
labelName.isPresent() ? labelName.get() : "", e);
}
}

Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,11 @@ private void resetAnalyzerAndStmt() {

// Because this is called by other thread
public void cancel() {
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
if (insertOverwriteTableCommand.isPresent()) {
// If the be scheduling has not been triggered yet, cancel the scheduling first
insertOverwriteTableCommand.get().cancel();
}
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
Expand All @@ -1481,6 +1486,22 @@ public void cancel() {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
if (insertOverwriteTableCommand.isPresent()) {
// Wait for the command to run or cancel completion
insertOverwriteTableCommand.get().waitNotRunning();
}
}

private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
if (logicalPlan instanceof InsertOverwriteTableCommand) {
InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan;
return Optional.of(insertOverwriteTableCommand);
}
}
return Optional.empty();
}

// Because this is called by other thread
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.insertoverwrite;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;

import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class InsertOverwriteManagerTest {
@Mocked
private DatabaseIf db;

@Mocked
private TableIf table;

@Before
public void setUp()
throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException {

new Expectations() {
{
db.getId();
minTimes = 0;
result = 1L;

db.getFullName();
minTimes = 0;
result = "db1";

table.getId();
minTimes = 0;
result = 2L;

table.getName();
minTimes = 0;
result = "table1";
}
};
}

@Test
public void testParallel() {
InsertOverwriteManager manager = new InsertOverwriteManager();
manager.recordRunningTableOrException(db, table);
try {
manager.recordRunningTableOrException(db, table);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Not allowed"));
}
manager.dropRunningRecord(db.getId(), table.getId());
manager.recordRunningTableOrException(db, table);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,29 @@ class Suite implements GroovyInterceptable {
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
if (status != "SUCCESS") {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
}

void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED'))
if (status != "SUCCESS") {
logger.info("status is not success")
}
Expand Down
Loading