Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int
}

BufferControlBlock::~BufferControlBlock() {
cancel();
cancel(Status::Cancelled("Cancelled"));
}

Status BufferControlBlock::init() {
Expand Down Expand Up @@ -275,12 +275,12 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
return Status::OK();
}

void BufferControlBlock::cancel() {
void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
ctx->on_failure(reason);
}
_waiting_rpc.clear();
_update_dependency();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class BufferControlBlock {
// close buffer block, set _status to exec_status and set _is_close to true;
// called because data has been read or error happened.
Status close(const TUniqueId& id, Status exec_status);
// this is called by RPC, called from coordinator
void cancel();

void cancel(const Status& reason);

[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }

Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "common/status.h"
#include "runtime/buffer_control_block.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -144,13 +145,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
return Status::OK();
}

void ResultBufferMgr::cancel(const TUniqueId& query_id) {
void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
auto iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
iter->second->cancel();
iter->second->cancel(reason);
_buffer_map.erase(iter);
}
}
Expand Down Expand Up @@ -200,7 +201,7 @@ void ResultBufferMgr::cancel_thread() {

// cancel query
for (const auto& id : query_to_cancel) {
cancel(id);
cancel(id, Status::TimedOut("Query tiemout"));
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_buffer_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ResultBufferMgr {
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);

// cancel
void cancel(const TUniqueId& fragment_id);
void cancel(const TUniqueId& query_id, const Status& reason);

// cancel one query at a future time.
void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ public void kill(boolean killConnection) {
closeChannel();
}
// Now, cancel running query.
cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user"));
cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user from " + getRemoteHostPortString()));
}

// kill operation with no protect by timeout.
Expand Down Expand Up @@ -1015,7 +1015,7 @@ public void checkTimeout(long now) {
long timeout = getExecTimeout() * 1000L;
if (delta > timeout) {
LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}",
timeoutTag, getRemoteHostPortString(), timeout, queryId);
timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId));
killFlag = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ public RowBatch getNext() throws Exception {
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg);
throw new UserException(errMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public RowBatch getNext(Status status) throws TException {
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
} finally {
synchronized (this) {
currentThread = null;
Expand Down