Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ TStatusError(MEM_LIMIT_EXCEEDED);
TStatusError(THRIFT_RPC_ERROR);
TStatusError(TIMEOUT);
TStatusError(TOO_MANY_TASKS);
TStatusError(SERVICE_UNAVAILABLE);
TStatusError(UNINITIALIZED);
TStatusError(ABORTED);
TStatusError(DATA_QUALITY_ERROR);
Expand Down Expand Up @@ -113,6 +112,7 @@ E(NOT_INITIALIZED, -236);
E(ALREADY_CANCELLED, -237);
E(TOO_MANY_SEGMENTS, -238);
E(ALREADY_CLOSED, -239);
E(SERVICE_UNAVAILABLE, -240);
E(CE_CMD_PARAMS_ERROR, -300);
E(CE_BUFFER_TOO_SMALL, -301);
E(CE_CMD_NOT_VALID, -302);
Expand Down Expand Up @@ -421,7 +421,6 @@ class Status {
ERROR_CTOR(RpcError, THRIFT_RPC_ERROR)
ERROR_CTOR(TimedOut, TIMEOUT)
ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS)
ERROR_CTOR(ServiceUnavailable, SERVICE_UNAVAILABLE)
ERROR_CTOR(Uninitialized, UNINITIALIZED)
ERROR_CTOR(Aborted, ABORTED)
ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR)
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) {
int64_t duration_ns = 0;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::ServiceUnavailable("Have not get FE Master heartbeat yet");
status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master heartbeat yet");
} else {
SCOPED_RAW_TIMER(&duration_ns);
#ifndef BE_TEST
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/arrow_flight/flight_sql_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status FlightSqlServer::init(int port) {
LOG(INFO) << "Arrow Flight Service not start";
return Status::OK();
}
_inited = true;
arrow::flight::Location bind_location;
RETURN_DORIS_STATUS_IF_ERROR(
arrow::flight::Location::ForGrpcTcp(BackendOptions::get_service_bind_address(), port)
Expand All @@ -114,6 +115,10 @@ Status FlightSqlServer::init(int port) {
}

Status FlightSqlServer::join() {
if (!_inited) {
// Flight not inited, not need shutdown
return Status::OK();
}
RETURN_DORIS_STATUS_IF_ERROR(Shutdown());
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/service/arrow_flight/flight_sql_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FlightSqlServer : public arrow::flight::sql::FlightSqlServerBase {
private:
class Impl;
std::shared_ptr<Impl> impl_;
bool _inited = false;

explicit FlightSqlServer(std::shared_ptr<Impl> impl);
};
Expand Down
8 changes: 5 additions & 3 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ void ThreadPool::shutdown() {
// capacity, so clients can't tell them apart. This isn't really a practical
// concern though because shutting down a pool typically requires clients to
// be quiesced first, so there's no danger of a client getting confused.
_pool_status = Status::ServiceUnavailable("The thread pool {} has been shut down.", _name);
// Not print stack trace here
_pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
"The thread pool {} has been shut down.", _name);

// Clear the various queues under the lock, but defer the releasing
// of the tasks outside the lock, in case there are concurrent threads
Expand Down Expand Up @@ -356,14 +358,14 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
}

if (PREDICT_FALSE(!token->may_submit_new_tasks())) {
return Status::ServiceUnavailable("Thread pool({}) token was shut down", _name);
return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
}

// Size limit check.
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
if (capacity_remaining < 1) {
return Status::ServiceUnavailable(
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
_max_queue_size);
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/Status.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ enum TStatusCode {
NOT_AUTHORIZED = 38,
ABORTED = 39,
REMOTE_ERROR = 40,
SERVICE_UNAVAILABLE = 41,
//SERVICE_UNAVAILABLE = 41, // Not used any more
UNINITIALIZED = 42,
CONFIGURATION_ERROR = 43,
INCOMPLETE = 44,
Expand Down