From 4d08fc023cb3e8925283582db4c6cf49dc41b4dc Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 15 Jun 2020 20:48:56 +0800 Subject: [PATCH 1/3] remerge --- be/src/exec/exec_node.cpp | 2 +- be/src/exec/tablet_sink.cpp | 2 +- be/src/runtime/buffered_block_mgr2.cc | 2 +- be/src/runtime/data_spliter.cpp | 2 +- be/src/runtime/data_stream_sender.cpp | 2 +- be/src/runtime/dpp_sink.cpp | 4 +- be/src/runtime/export_sink.cpp | 2 +- be/src/runtime/memory_scratch_sink.cpp | 2 +- be/src/runtime/mysql_table_sink.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 17 +++++-- be/src/runtime/result_sink.cpp | 2 +- be/src/runtime/runtime_state.cpp | 9 ++-- be/src/runtime/runtime_state.h | 6 ++- be/src/util/dummy_runtime_profile.h | 2 +- be/src/util/runtime_profile.cpp | 46 ++----------------- be/src/util/runtime_profile.h | 17 +------ .../runtime/buffered_tuple_stream_test.cpp | 2 +- be/test/runtime/data_stream_test.cpp | 2 +- be/test/runtime/sorter_test.cpp | 7 +-- 19 files changed, 42 insertions(+), 88 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 226e42a9667daa..697dcc6a24d78e 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -552,7 +552,7 @@ void ExecNode::try_do_aggregate_serde_improve() { void ExecNode::init_runtime_profile(const std::string& name) { std::stringstream ss; ss << name << " (id=" << _id << ")"; - _runtime_profile.reset(new RuntimeProfile(_pool, ss.str())); + _runtime_profile.reset(new RuntimeProfile(ss.str())); _runtime_profile->set_metadata(_id); } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a84acbe68390a0..3521ad9d651cc4 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -457,7 +457,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { _num_senders = state->num_per_fragment_instances(); // profile must add to state's object pool - _profile = state->obj_pool()->add(new RuntimeProfile(_pool, "OlapTableSink")); + _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); _mem_tracker = _pool->add(new MemTracker(-1, "OlapTableSink", state->instance_mem_tracker())); SCOPED_TIMER(_profile->total_time_counter()); diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 01db890cc31b20..18f5b6383d07bf 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -1259,7 +1259,7 @@ void BufferedBlockMgr2::init( io_mgr->register_context(&_io_request_context); - _profile.reset(new RuntimeProfile(&_obj_pool, "BlockMgr")); + _profile.reset(new RuntimeProfile("BlockMgr")); parent_profile->add_child(_profile.get(), true, NULL); _block_size_counter = ADD_COUNTER(_profile.get(), "MaxBlockSize", TUnit::BYTES); diff --git a/be/src/runtime/data_spliter.cpp b/be/src/runtime/data_spliter.cpp index 7f48182fb41425..d06e7cbbdf3073 100644 --- a/be/src/runtime/data_spliter.cpp +++ b/be/src/runtime/data_spliter.cpp @@ -94,7 +94,7 @@ Status DataSpliter::prepare(RuntimeState* state) { for (auto& iter : _rollup_map) { RETURN_IF_ERROR(iter.second->prepare(state, _row_desc, _expr_mem_tracker.get())); } - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); for (auto iter : _partition_infos) { RETURN_IF_ERROR(iter->prepare(state, _row_desc, _expr_mem_tracker.get())); } diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 65c1e664d845b7..a642e00948b78e 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -381,7 +381,7 @@ Status DataStreamSender::prepare(RuntimeState* state) { _state = state; std::stringstream title; title << "DataStreamSender (dst_id=" << _dest_node_id << ")"; - _profile = _pool->add(new RuntimeProfile(_pool, title.str())); + _profile = _pool->add(new RuntimeProfile(title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker.reset( new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker())); diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp index bea1c96f95cc2c..b4aa6c63d67c69 100644 --- a/be/src/runtime/dpp_sink.cpp +++ b/be/src/runtime/dpp_sink.cpp @@ -569,7 +569,7 @@ Status Translator::create_profile(RuntimeState* state) { std::stringstream ss; ss << "Dpp translator(" << _tablet_desc.partition_id << "_" << _tablet_desc.bucket_id << "_" << _rollup_name << ")"; - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), ss.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(ss.str())); _add_batch_timer = ADD_TIMER(_profile, "add batch time"); _sort_timer = ADD_TIMER(_profile, "sort time"); @@ -875,7 +875,7 @@ Status Translator::close(RuntimeState* state) { } Status DppSink::init(RuntimeState* state) { - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), "Dpp sink")); + _profile = state->obj_pool()->add(new RuntimeProfile("Dpp sink")); return Status::OK(); } diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index cf5c4ffb8417e8..3aea0348cb46a6 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -64,7 +64,7 @@ Status ExportSink::prepare(RuntimeState* state) { std::stringstream title; title << "ExportSink (frag_id=" << state->fragment_instance_id() << ")"; // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker.reset(new MemTracker(-1, "ExportSink", state->instance_mem_tracker())); diff --git a/be/src/runtime/memory_scratch_sink.cpp b/be/src/runtime/memory_scratch_sink.cpp index 647f8e7e13ddbf..8093850d2c9853 100644 --- a/be/src/runtime/memory_scratch_sink.cpp +++ b/be/src/runtime/memory_scratch_sink.cpp @@ -67,7 +67,7 @@ Status MemoryScratchSink::prepare(RuntimeState* state) { std::stringstream title; title << "MemoryScratchSink (frag_id=" << fragment_instance_id << ")"; // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); return Status::OK(); } diff --git a/be/src/runtime/mysql_table_sink.cpp b/be/src/runtime/mysql_table_sink.cpp index 2e0d5868a60568..ec267e93e7fb5f 100644 --- a/be/src/runtime/mysql_table_sink.cpp +++ b/be/src/runtime/mysql_table_sink.cpp @@ -61,7 +61,7 @@ Status MysqlTableSink::prepare(RuntimeState* state) { std::stringstream title; title << "MysqlTableSink (frag_id=" << state->fragment_instance_id() << ")"; // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); return Status::OK(); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index b5cb4c92a59f75..9d06282e534974 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -323,7 +323,12 @@ Status PlanFragmentExecutor::open_internal() { { SCOPED_TIMER(profile()->total_time_counter()); collect_query_statistics(); - Status status = _sink->close(runtime_state(), _status); + Status status; + { + boost::lock_guard l(_status_lock); + status = _status; + } + status = _sink->close(runtime_state(), status); RETURN_IF_ERROR(status); } @@ -489,8 +494,7 @@ void PlanFragmentExecutor::update_status(const Status& status) { { boost::lock_guard l(_status_lock); - - if (_status.ok()) { + if (!_status.ok()) { if (status.is_mem_limit_exceeded()) { _runtime_state->set_mem_limit_exceeded(status.get_error_msg()); } @@ -547,7 +551,12 @@ void PlanFragmentExecutor::close() { if (_sink.get() != NULL) { if (_prepared) { - _sink->close(runtime_state(), _status); + Status status; + { + boost::lock_guard l(_status_lock); + status = _status; + } + _sink->close(runtime_state(), status); } else { _sink->close(runtime_state(), Status::InternalError("prepare failed")); } diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index be51a5b991254a..af85fb8256d1c0 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -67,7 +67,7 @@ Status ResultSink::prepare(RuntimeState* state) { std::stringstream title; title << "DataBufferSender (dst_fragment_instance_id=" << print_id(state->fragment_instance_id()) << ")"; // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); // prepare output_expr RETURN_IF_ERROR(prepare_exprs(state)); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index de0f3effc8f97d..37c63a208d5d6a 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -51,10 +51,10 @@ RuntimeState::RuntimeState( const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : + _profile("Fragment " + print_id(fragment_instance_id)), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), - _profile(_obj_pool.get(), "Fragment " + print_id(fragment_instance_id)), _fragment_mem_tracker(NULL), _is_cancelled(false), _per_fragment_instance_idx(0), @@ -76,12 +76,11 @@ RuntimeState::RuntimeState( const TExecPlanFragmentParams& fragment_params, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env) : + _profile("Fragment " + print_id(fragment_params.params.fragment_instance_id)), _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(fragment_params.params.query_id), - _profile(_obj_pool.get(), - "Fragment " + print_id(fragment_params.params.fragment_instance_id)), _fragment_mem_tracker(NULL), _is_cancelled(false), _per_fragment_instance_idx(0), @@ -100,10 +99,10 @@ RuntimeState::RuntimeState( } RuntimeState::RuntimeState(const TQueryGlobals& query_globals) - : _obj_pool(new ObjectPool()), + : _profile(""), + _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), - _profile(_obj_pool.get(), ""), _is_cancelled(false), _per_fragment_instance_idx(0) { _query_options.batch_size = DEFAULT_BATCH_SIZE; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cc88e5b9da599d..d53b3e8fb730ba 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -504,6 +504,10 @@ class RuntimeState { static const int DEFAULT_BATCH_SIZE = 2048; + // put runtime state before _obj_pool, so that it will be deconstructed after + // _obj_pool. And some of object in _obj_pool will use profile when deconstructing. + RuntimeProfile _profile; + DescriptorTbl* _desc_tbl; std::shared_ptr _obj_pool; @@ -542,8 +546,6 @@ class RuntimeState { // state is responsible for returning this pool to the thread mgr. ThreadResourceMgr::ResourcePool* _resource_pool; - RuntimeProfile _profile; - // all mem limits that apply to this query std::vector _mem_trackers; diff --git a/be/src/util/dummy_runtime_profile.h b/be/src/util/dummy_runtime_profile.h index 536452c6f22d3c..933eddda679f07 100755 --- a/be/src/util/dummy_runtime_profile.h +++ b/be/src/util/dummy_runtime_profile.h @@ -24,7 +24,7 @@ namespace doris { class DummyProfile { public: - DummyProfile() : _pool(), _profile(new RuntimeProfile(&_pool, "dummy", false)) {} + DummyProfile() : _pool(), _profile(new RuntimeProfile("dummy", false)) {} RuntimeProfile* profile() { return _profile; } virtual ~DummyProfile() { delete _profile; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 07f4658aa6dfd7..53c5b3431d3113 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -47,8 +47,7 @@ static const std::string ROOT_COUNTER = ""; RuntimeProfile::PeriodicCounterUpdateState RuntimeProfile::_s_periodic_counter_update_state; -// TODO: we do not use the param pool should we del the param ObjectPool -RuntimeProfile::RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile) +RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile) : _pool(new ObjectPool()), _own_pool(false), _name(name), @@ -77,43 +76,6 @@ RuntimeProfile::~RuntimeProfile() { } } -RuntimeProfile* RuntimeProfile::create_from_thrift(ObjectPool* pool, - const TRuntimeProfileTree& profiles) { - if (profiles.nodes.size() == 0) { - return NULL; - } - - int idx = 0; - return RuntimeProfile::create_from_thrift(pool, profiles.nodes, &idx); -} - -RuntimeProfile* RuntimeProfile::create_from_thrift(ObjectPool* pool, - const std::vector& nodes, - int* idx) { - DCHECK_LT(*idx, nodes.size()); - - const TRuntimeProfileNode& node = nodes[*idx]; - RuntimeProfile* profile = pool->add(new RuntimeProfile(pool, node.name)); - profile->_metadata = node.metadata; - - for (int i = 0; i < node.counters.size(); ++i) { - const TCounter& counter = node.counters[i]; - profile->_counter_map[counter.name] = pool->add(new Counter(counter.type, counter.value)); - } - - profile->_child_counter_map = node.child_counters_map; - profile->_info_strings = node.info_strings; - profile->_info_strings_display_order = node.info_strings_display_order; - - ++*idx; - - for (int i = 0; i < node.num_children; ++i) { - profile->add_child(RuntimeProfile::create_from_thrift(pool, nodes, idx), false, NULL); - } - - return profile; -} - void RuntimeProfile::merge(RuntimeProfile* other) { DCHECK(other != NULL); @@ -168,7 +130,7 @@ void RuntimeProfile::merge(RuntimeProfile* other) { if (j != _child_map.end()) { child = j->second; } else { - child = _pool->add(new RuntimeProfile(_pool.get(), other_child->_name)); + child = _pool->add(new RuntimeProfile(other_child->_name)); child->_local_time_percent = other_child->_local_time_percent; child->_metadata = other_child->_metadata; bool indent_other_child = other->_children[i].second; @@ -257,7 +219,7 @@ void RuntimeProfile::update(const std::vector& nodes, int* if (j != _child_map.end()) { child = j->second; } else { - child = _pool->add(new RuntimeProfile(_pool.get(), tchild.name)); + child = _pool->add(new RuntimeProfile(tchild.name)); child->_metadata = tchild.metadata; _child_map[tchild.name] = child; _children.push_back(std::make_pair(child, tchild.indent)); @@ -325,7 +287,7 @@ void RuntimeProfile::compute_time_in_profile(int64_t total) { RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool indent, bool prepend) { boost::lock_guard l(_children_lock); DCHECK(_child_map.find(name) == _child_map.end()); - RuntimeProfile* child = _pool->add(new RuntimeProfile(_pool.get(), name)); + RuntimeProfile* child = _pool->add(new RuntimeProfile(name)); if (_children.empty()) { add_child_unlock(child, indent, NULL); } else { diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 1b57776ec8dfa6..d39483e72a9afb 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -275,17 +275,11 @@ class RuntimeProfile { MonotonicStopWatch _sw; }; - // Create a runtime profile object with 'name'. Counters and merged profile are - // allocated from pool. - RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile = false); + // Create a runtime profile object with 'name'. + RuntimeProfile(const std::string& name, bool is_averaged_profile = false); ~RuntimeProfile(); - // Deserialize from thrift. Runtime profiles are allocated from the pool. - static RuntimeProfile* create_from_thrift( - ObjectPool* pool, - const TRuntimeProfileTree& profiles); - // Adds a child profile. This is thread safe. // 'indent' indicates whether the child will be printed w/ extra indentation // relative to the parent. @@ -601,13 +595,6 @@ class RuntimeProfile { // for updating them. static PeriodicCounterUpdateState _s_periodic_counter_update_state; - // Create a subtree of runtime profiles from nodes, starting at *node_idx. - // On return, *node_idx is the index one past the end of this subtree - static RuntimeProfile* create_from_thrift( - ObjectPool* pool, - const std::vector& nodes, - int* node_idx); - // update a subtree of profiles from nodes, rooted at *idx. // On return, *idx points to the node immediately following this subtree. void update(const std::vector& nodes, int* idx); diff --git a/be/test/runtime/buffered_tuple_stream_test.cpp b/be/test/runtime/buffered_tuple_stream_test.cpp index 9d3e0514afa45f..7cee3f26a59866 100644 --- a/be/test/runtime/buffered_tuple_stream_test.cpp +++ b/be/test/runtime/buffered_tuple_stream_test.cpp @@ -45,7 +45,7 @@ class BufferedTupleStreamTest : public testing::Test { RowBatch* create_row_batch(int num_rows); BufferedTupleStreamTest() { _object_pool = new ObjectPool(); - _profile = new RuntimeProfile(_object_pool, "bufferedStream"); + _profile = new RuntimeProfile("bufferedStream"); _runtime_state = new RuntimeState("BufferedTupleStreamTest"); _runtime_state->exec_env_ = &_exec_env; _runtime_state->create_block_mgr(); diff --git a/be/test/runtime/data_stream_test.cpp b/be/test/runtime/data_stream_test.cpp index 7723dd0bd80399..e2558f82380928 100644 --- a/be/test/runtime/data_stream_test.cpp +++ b/be/test/runtime/data_stream_test.cpp @@ -377,7 +377,7 @@ class DataStreamTest : public testing::Test { void start_receiver(TPartitionType::type stream_type, int num_senders, int receiver_num, int buffer_size, bool is_merging, TUniqueId* out_id = NULL) { VLOG_QUERY << "start receiver"; - RuntimeProfile* profile = _obj_pool.add(new RuntimeProfile(&_obj_pool, "TestReceiver")); + RuntimeProfile* profile = _obj_pool.add(new RuntimeProfile("TestReceiver")); TUniqueId instance_id; get_next_instance_id(&instance_id); _receiver_info.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); diff --git a/be/test/runtime/sorter_test.cpp b/be/test/runtime/sorter_test.cpp index 0a95fd5b103f6d..48ce5fcde58fd3 100644 --- a/be/test/runtime/sorter_test.cpp +++ b/be/test/runtime/sorter_test.cpp @@ -144,9 +144,7 @@ class SorterTest : public testing::Test { row_tuples.push_back(1); _output_row_desc = new RowDescriptor(*_desc_tbl, row_tuples, nullable_tuples); } - _runtime_profile = new RuntimeProfile(get_object_pool(), "sorter"); - - + _runtime_profile = new RuntimeProfile("sorter"); } virtual ~SorterTest() { delete _child_row_desc; @@ -240,7 +238,6 @@ TEST_F(SorterTest, sorter_run_asc) { less_than, exec_exprs.sort_tuple_slot_expr_ctxs(), _child_row_desc, _runtime_profile, _runtime_state); - // new RuntimeProfile(get_object_pool(), "sorter"), _runtime_state); int num_rows = 5; RowBatch* batch = CreateRowBatch(num_rows); @@ -290,7 +287,6 @@ TEST_F(SorterTest, sorter_run_desc_with_quick_sort) { less_than, exec_exprs.sort_tuple_slot_expr_ctxs(), _child_row_desc, _runtime_profile, _runtime_state); - // new RuntimeProfile(get_object_pool(), "sorter"), _runtime_state); int num_rows = 5; RowBatch* batch = CreateRowBatch(num_rows); @@ -332,7 +328,6 @@ TEST_F(SorterTest, sorter_run_desc) { less_than, exec_exprs.sort_tuple_slot_expr_ctxs(), _child_row_desc, _runtime_profile, _runtime_state); - // new RuntimeProfile(get_object_pool(), "sorter"), _runtime_state); int num_rows = 5; RowBatch* batch = CreateRowBatch(num_rows); From 6870491338a46ce2ce5d15cfab356f3e6d02f56e Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jun 2020 14:36:54 +0800 Subject: [PATCH 2/3] add missing code --- be/src/runtime/runtime_state.h | 2 +- .../java/org/apache/doris/qe/Coordinator.java | 39 ++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index d53b3e8fb730ba..9f3a5c1f75ee91 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -505,7 +505,7 @@ class RuntimeState { static const int DEFAULT_BATCH_SIZE = 2048; // put runtime state before _obj_pool, so that it will be deconstructed after - // _obj_pool. And some of object in _obj_pool will use profile when deconstructing. + // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. RuntimeProfile _profile; DescriptorTbl* _desc_tbl; diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index c0b2e85809f8d3..01da5c6dd71449 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -52,6 +52,7 @@ import org.apache.doris.planner.UnionNode; import org.apache.doris.proto.PExecPlanFragmentResult; import org.apache.doris.proto.PPlanFragmentCancelReason; +import org.apache.doris.proto.PStatus; import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; @@ -1469,8 +1470,42 @@ public Future execRemoteFragmentAsync() throws TExcepti try { return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams); } catch (RpcException e) { - SimpleScheduler.addToBlacklist(backend.getId()); - throw e; + // DO NOT throw exception here, return a complete future with error code, + // so that the following logic will cancel the fragment. + Future future = new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public PExecPlanFragmentResult get() throws InterruptedException, ExecutionException { + PExecPlanFragmentResult result = new PExecPlanFragmentResult(); + PStatus pStatus = new PStatus(); + pStatus.error_msgs.add(e.getMessage()); + // use THRIFT_RPC_ERROR so that this BE will be added to the blacklist later. + pStatus.status_code = TStatusCode.THRIFT_RPC_ERROR.getValue(); + result.status = pStatus; + return result; + } + + @Override + public PExecPlanFragmentResult get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return get(); + } + }; + return future; } } From a9201abf538f2a86520ccd4a244a324cb7dc251c Mon Sep 17 00:00:00 2001 From: chenmingyu Date: Thu, 18 Jun 2020 21:11:01 +0800 Subject: [PATCH 3/3] fix ut --- be/test/exec/plain_text_line_reader_bzip_test.cpp | 3 +-- be/test/exec/plain_text_line_reader_gzip_test.cpp | 3 +-- be/test/exec/plain_text_line_reader_lz4frame_test.cpp | 3 +-- be/test/exec/plain_text_line_reader_lzop_test.cpp | 3 +-- be/test/exec/plain_text_line_reader_uncompressed_test.cpp | 3 +-- 5 files changed, 5 insertions(+), 10 deletions(-) diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp index 3b3a073736b3a7..1999d08d011255 100644 --- a/be/test/exec/plain_text_line_reader_bzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp @@ -27,7 +27,7 @@ namespace doris { class PlainTextLineReaderTest : public testing::Test { public: - PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") { + PlainTextLineReaderTest() : _profile("TestProfile") { } protected: @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test { virtual void TearDown() { } private: - ObjectPool _obj_pool; RuntimeProfile _profile; }; diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp index 2c2f3f60c2d088..d3e22ec1dc4339 100644 --- a/be/test/exec/plain_text_line_reader_gzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp @@ -27,7 +27,7 @@ namespace doris { class PlainTextLineReaderTest : public testing::Test { public: - PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") { + PlainTextLineReaderTest() : _profile("TestProfile") { } protected: @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test { virtual void TearDown() { } private: - ObjectPool _obj_pool; RuntimeProfile _profile; }; diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp index 1c7a544a78d43a..24ba1ac43e724f 100644 --- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp +++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp @@ -27,7 +27,7 @@ namespace doris { class PlainTextLineReaderTest : public testing::Test { public: - PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") { + PlainTextLineReaderTest() : _profile("TestProfile") { } protected: @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test { virtual void TearDown() { } private: - ObjectPool _obj_pool; RuntimeProfile _profile; }; diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp index 627808cb85133c..32df02f99ac0c0 100644 --- a/be/test/exec/plain_text_line_reader_lzop_test.cpp +++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp @@ -27,7 +27,7 @@ namespace doris { class PlainTextLineReaderTest : public testing::Test { public: - PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") { + PlainTextLineReaderTest() : _profile("TestProfile") { } protected: @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test { virtual void TearDown() { } private: - ObjectPool _obj_pool; RuntimeProfile _profile; }; diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp index 7641a3675c2ecd..46ee71b86137fa 100644 --- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp +++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp @@ -27,7 +27,7 @@ namespace doris { class PlainTextLineReaderTest : public testing::Test { public: - PlainTextLineReaderTest() : _profile(&_obj_pool, "TestProfile") { + PlainTextLineReaderTest() : _profile("TestProfile") { } protected: @@ -36,7 +36,6 @@ class PlainTextLineReaderTest : public testing::Test { virtual void TearDown() { } private: - ObjectPool _obj_pool; RuntimeProfile _profile; };