diff --git a/be/src/common/status.h b/be/src/common/status.h index 14b91dd7d002ce..7713d3c4bd2a3d 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -51,7 +51,6 @@ TStatusError(MEM_LIMIT_EXCEEDED); TStatusError(THRIFT_RPC_ERROR); TStatusError(TIMEOUT); TStatusError(TOO_MANY_TASKS); -TStatusError(SERVICE_UNAVAILABLE); TStatusError(UNINITIALIZED); TStatusError(ABORTED); TStatusError(DATA_QUALITY_ERROR); @@ -113,6 +112,7 @@ E(NOT_INITIALIZED, -236); E(ALREADY_CANCELLED, -237); E(TOO_MANY_SEGMENTS, -238); E(ALREADY_CLOSED, -239); +E(SERVICE_UNAVAILABLE, -240); E(CE_CMD_PARAMS_ERROR, -300); E(CE_BUFFER_TOO_SMALL, -301); E(CE_CMD_NOT_VALID, -302); @@ -421,7 +421,6 @@ class Status { ERROR_CTOR(RpcError, THRIFT_RPC_ERROR) ERROR_CTOR(TimedOut, TIMEOUT) ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS) - ERROR_CTOR(ServiceUnavailable, SERVICE_UNAVAILABLE) ERROR_CTOR(Uninitialized, UNINITIALIZED) ERROR_CTOR(Aborted, ABORTED) ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index e1e8e2ff25d90f..1ecba95748aeec 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -248,7 +248,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { int64_t duration_ns = 0; TNetworkAddress master_addr = _exec_env->master_info()->network_address; if (master_addr.hostname.empty() || master_addr.port == 0) { - status = Status::ServiceUnavailable("Have not get FE Master heartbeat yet"); + status = Status::Error("Have not get FE Master heartbeat yet"); } else { SCOPED_RAW_TIMER(&duration_ns); #ifndef BE_TEST diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp b/be/src/service/arrow_flight/flight_sql_service.cpp index 1bddedc4aa72eb..60add8698ad28e 100644 --- a/be/src/service/arrow_flight/flight_sql_service.cpp +++ b/be/src/service/arrow_flight/flight_sql_service.cpp @@ -102,6 +102,7 @@ Status FlightSqlServer::init(int port) { LOG(INFO) << "Arrow Flight Service not start"; return Status::OK(); } + _inited = true; arrow::flight::Location bind_location; RETURN_DORIS_STATUS_IF_ERROR( arrow::flight::Location::ForGrpcTcp(BackendOptions::get_service_bind_address(), port) @@ -114,6 +115,10 @@ Status FlightSqlServer::init(int port) { } Status FlightSqlServer::join() { + if (!_inited) { + // Flight not inited, not need shutdown + return Status::OK(); + } RETURN_DORIS_STATUS_IF_ERROR(Shutdown()); return Status::OK(); } diff --git a/be/src/service/arrow_flight/flight_sql_service.h b/be/src/service/arrow_flight/flight_sql_service.h index aa170acd1fc9b5..4772e98d81d114 100644 --- a/be/src/service/arrow_flight/flight_sql_service.h +++ b/be/src/service/arrow_flight/flight_sql_service.h @@ -41,6 +41,7 @@ class FlightSqlServer : public arrow::flight::sql::FlightSqlServerBase { private: class Impl; std::shared_ptr impl_; + bool _inited = false; explicit FlightSqlServer(std::shared_ptr impl); }; diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 93c14f4d61b8da..6ac02e5cbd74bd 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -274,7 +274,9 @@ void ThreadPool::shutdown() { // capacity, so clients can't tell them apart. This isn't really a practical // concern though because shutting down a pool typically requires clients to // be quiesced first, so there's no danger of a client getting confused. - _pool_status = Status::ServiceUnavailable("The thread pool {} has been shut down.", _name); + // Not print stack trace here + _pool_status = Status::Error( + "The thread pool {} has been shut down.", _name); // Clear the various queues under the lock, but defer the releasing // of the tasks outside the lock, in case there are concurrent threads @@ -356,14 +358,14 @@ Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token } if (PREDICT_FALSE(!token->may_submit_new_tasks())) { - return Status::ServiceUnavailable("Thread pool({}) token was shut down", _name); + return Status::Error("Thread pool({}) token was shut down", _name); } // Size limit check. int64_t capacity_remaining = static_cast(_max_threads) - _active_threads + static_cast(_max_queue_size) - _total_queued_tasks; if (capacity_remaining < 1) { - return Status::ServiceUnavailable( + return Status::Error( "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, _max_queue_size); diff --git a/gensrc/thrift/Status.thrift b/gensrc/thrift/Status.thrift index 7b12d3b0603d23..06083b9a93ccc8 100644 --- a/gensrc/thrift/Status.thrift +++ b/gensrc/thrift/Status.thrift @@ -69,7 +69,7 @@ enum TStatusCode { NOT_AUTHORIZED = 38, ABORTED = 39, REMOTE_ERROR = 40, - SERVICE_UNAVAILABLE = 41, + //SERVICE_UNAVAILABLE = 41, // Not used any more UNINITIALIZED = 42, CONFIGURATION_ERROR = 43, INCOMPLETE = 44,