Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
.tag("reason", reason)
.tag("reason", PPlanFragmentCancelReason_Name(reason))
.tag("error message", msg);
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
Expand Down
10 changes: 5 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
}

BufferControlBlock::~BufferControlBlock() {
cancel();
cancel(Status::Cancelled("Cancelled"));
}

Status BufferControlBlock::init() {
Expand Down Expand Up @@ -266,13 +266,13 @@ Status BufferControlBlock::close(Status exec_status) {
return Status::OK();
}

void BufferControlBlock::cancel() {
void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_data_removal.notify_all();
_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
ctx->on_failure(reason);
}
_waiting_rpc.clear();
}
Expand Down Expand Up @@ -301,8 +301,8 @@ Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch
return Status::OK();
}

void PipBufferControlBlock::cancel() {
BufferControlBlock::cancel();
void PipBufferControlBlock::cancel(const Status& reason) {
BufferControlBlock::cancel(reason);
_update_dependency();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class BufferControlBlock {
// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Status close(Status exec_status);
// this is called by RPC, called from coordinator
virtual void cancel();

virtual void cancel(const Status& reason);

[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }

Expand Down Expand Up @@ -152,7 +152,7 @@ class PipBufferControlBlock : public BufferControlBlock {

Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override;

void cancel() override;
void cancel(const Status& reason) override;

void set_dependency(std::shared_ptr<pipeline::Dependency> result_sink_dependency);

Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() {
clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);

do {
std::vector<TUniqueId> to_cancel;
std::vector<TUniqueId> queries_timeout;
std::vector<TUniqueId> queries_to_cancel;
std::vector<TUniqueId> queries_pipeline_task_leak;
// Fe process uuid -> set<QueryId>
Expand All @@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() {
std::lock_guard<std::mutex> lock(_lock);
for (auto& fragment_instance_itr : _fragment_instance_map) {
if (fragment_instance_itr.second->is_timeout(now)) {
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
for (auto& pipeline_itr : _pipeline_map) {
Expand All @@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() {
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
->instance_ids(ins_ids);
for (auto& ins_id : ins_ids) {
to_cancel.push_back(ins_id);
queries_timeout.push_back(ins_id);
}
} else {
pipeline_itr.second->clear_finished_tasks();
Expand Down Expand Up @@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() {

// TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
// designed to count canceled fragment of non-pipeline query.
timeout_canceled_fragment_count->increment(to_cancel.size());
for (auto& id : to_cancel) {
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
timeout_canceled_fragment_count->increment(queries_timeout.size());
for (auto& id : queries_timeout) {
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout");
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance "
<< print_id(id);
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "common/status.h"
#include "runtime/buffer_control_block.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -150,13 +151,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
return Status::OK();
}

void ResultBufferMgr::cancel(const TUniqueId& query_id) {
void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
iter->second->cancel(reason);
_buffer_map.erase(iter);
}
}
Expand Down Expand Up @@ -206,7 +207,7 @@ void ResultBufferMgr::cancel_thread() {

// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
cancel(query_to_cancel[i]);
cancel(query_to_cancel[i], Status::TimedOut("Query tiemout"));
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ResultBufferMgr {
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);

// cancel
void cancel(const TUniqueId& fragment_id);
void cancel(const TUniqueId& query_id, const Status& reason);

// cancel one query at a future time.
void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response
}

ExecuteEnv env = ExecuteEnv.getInstance();
env.getScheduler().cancelQuery(queryId);
env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
return ResponseEntityBuilder.ok();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ protected void executeCancelLogic() {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
stmtExecutor.cancel();
stmtExecutor.cancel("insert task cancelled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public synchronized void onSuccess() throws JobException {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel();
executor.cancel("mtmv task cancelled");
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void cancel() throws JobException {
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel("export task cancelled");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(failMsg.getMsg());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
coordinator.cancel("insert timeout");
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ public void kill(boolean killConnection) {
closeChannel();
}
// Now, cancel running query.
cancelQuery();
cancelQuery("cancel query by user from " + getRemoteHostPortString());
}

// kill operation with no protect by timeout.
Expand All @@ -956,10 +956,10 @@ private void killByTimeout(boolean killConnection) {
}
}

public void cancelQuery() {
public void cancelQuery(String cancelMessage) {
StmtExecutor executorRef = executor;
if (executorRef != null) {
executorRef.cancel();
executorRef.cancel(cancelMessage);
}
}

Expand Down Expand Up @@ -990,7 +990,7 @@ public void checkTimeout(long now) {
long timeout = getExecTimeout() * 1000L;
if (delta > timeout) {
LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}",
timeoutTag, getRemoteHostPortString(), timeout, queryId);
timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId));
killFlag = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ public ConnectContext getContext(String flightToken) {
return null;
}

public void cancelQuery(String queryId) {
public void cancelQuery(String queryId, String cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery();
ctx.cancelQuery(cancelReason);
break;
}
}
Expand Down
28 changes: 12 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, Future<
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
Expand Down Expand Up @@ -1259,7 +1259,7 @@ private Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Triple
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
Expand Down Expand Up @@ -1385,9 +1385,9 @@ private void updateStatus(Status status) {

queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, status.getErrorMsg());
} else {
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg());
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -1426,7 +1426,7 @@ public RowBatch getNext() throws Exception {
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg);
throw new UserException(errMsg);
}
}
Expand All @@ -1441,7 +1441,7 @@ public RowBatch getNext() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
numReceivedRows = 0;
Expand Down Expand Up @@ -1528,8 +1528,8 @@ public Status shouldCancel(List<Backend> currentBackends) {
// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
public void cancel(String errorMsg) {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg);
if (queueToken != null) {
queueToken.cancel();
}
Expand All @@ -1552,8 +1552,8 @@ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String errorMsg
queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
}
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
DebugUtil.printId(queryId), cancelReason.toString());
cancelInternal(cancelReason);
DebugUtil.printId(queryId), errorMsg);
cancelInternal(cancelReason, errorMsg);
} finally {
unlock();
}
Expand All @@ -1577,9 +1577,9 @@ private void cancelLatch() {
}
}

private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, String cancelMessage) {
if (null != receiver) {
receiver.cancel(cancelReason);
receiver.cancel(cancelReason, cancelMessage);
}
if (null != pointExec) {
pointExec.cancel();
Expand Down Expand Up @@ -3307,10 +3307,6 @@ public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
DebugUtil.printId(fragmentInstanceId()), status.toString());
}
}
LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {},"
+ "fragment instance id={}, reason: {}",
DebugUtil.printId(queryId), initiated, done, backend.getId(),
DebugUtil.printId(fragmentInstanceId()), "without status");
}

public void onFailure(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public RowBatch getNext(Status status) throws TException {
LOG.warn("Query {} get result timeout, get result duration {} ms",
DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000);
setRunStatus(Status.TIMEOUT);
status.updateStatus(TStatusCode.TIMEOUT, "");
updateCancelReason("fetch data timeout");
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
updateCancelReason("Query timeout");
return null;
} catch (InterruptedException e) {
// continue to get result
Expand Down Expand Up @@ -183,7 +183,7 @@ public RowBatch getNext(Status status) throws TException {
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
} finally {
synchronized (this) {
currentThread = null;
Expand All @@ -205,13 +205,14 @@ private void updateCancelReason(String reason) {
}
}

public void cancel(Types.PPlanFragmentCancelReason reason) {
public void cancel(Types.PPlanFragmentCancelReason reason, String cancelMessage) {
if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
setRunStatus(Status.TIMEOUT);
} else {
setRunStatus(Status.CANCELLED);
}
updateCancelReason(reason.toString());

updateCancelReason(cancelMessage);
synchronized (this) {
if (currentThread != null) {
// TODO(cmy): we cannot interrupt this thread, or we may throw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,15 +1475,15 @@ private void resetAnalyzerAndStmt() {
}

// Because this is called by other thread
public void cancel() {
public void cancel(String message) {
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
if (insertOverwriteTableCommand.isPresent()) {
// If the be scheduling has not been triggered yet, cancel the scheduling first
insertOverwriteTableCommand.get().cancel();
}
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
coordRef.cancel(message);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
Expand Down
Loading