diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 16305a8e91519e..8df1fc84f86184 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -579,6 +579,8 @@ std::shared_ptr FragmentMgr::get_or_erase_query_ctx_with_lock( template Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, std::shared_ptr& query_ctx) { + DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", + { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map std::lock_guard lock(_lock); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 0384b502e0af5d..2383f25afc82b3 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -38,12 +38,10 @@ namespace doris { Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr block, bool write_wal, UniqueId& load_id) { + DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed", + { return Status::InternalError("LoadBlockQueue.add_block.failed"); }); std::unique_lock l(mutex); RETURN_IF_ERROR(status); - auto start = std::chrono::steady_clock::now(); - DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", { - start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000); - }); if (UNLIKELY(runtime_state->is_cancelled())) { return runtime_state->cancel_reason(); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d8cd7356ddb4c3..2492d2a846b105 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2030,7 +2030,10 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont TUniqueId load_id; load_id.__set_hi(request->load_id().hi()); load_id.__set_lo(request->load_id().lo()); - bool ret = _light_work_pool.try_offer([this, request, response, done, load_id]() { + std::shared_ptr lock = std::make_shared(); + std::shared_ptr is_done = std::make_shared(false); + bool ret = _light_work_pool.try_offer([this, request, response, done, load_id, lock, + is_done]() { brpc::ClosureGuard closure_guard(done); std::shared_ptr ctx = std::make_shared(_exec_env); auto pipe = std::make_shared( @@ -2044,7 +2047,13 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont request->exec_plan_fragment_request().request(), request->exec_plan_fragment_request().version(), request->exec_plan_fragment_request().compact(), - [&, response, done, load_id](RuntimeState* state, Status* status) { + [&, response, done, load_id, lock, is_done](RuntimeState* state, + Status* status) { + std::lock_guard lock1(*lock); + if (*is_done) { + return; + } + *is_done = true; brpc::ClosureGuard cb_closure_guard(done); response->set_label(state->import_label()); response->set_txn_id(state->wal_id()); @@ -2063,11 +2072,19 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont st = Status::Error(ErrorCode::INTERNAL_ERROR, "_exec_plan_fragment_impl meet unknown error"); } - closure_guard.release(); if (!st.ok()) { LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id) << ", errmsg=" << st; + std::lock_guard lock1(*lock); + if (*is_done) { + closure_guard.release(); + } else { + *is_done = true; + st.to_protobuf(response->mutable_status()); + _exec_env->new_load_stream_mgr()->remove(load_id); + } } else { + closure_guard.release(); for (int i = 0; i < request->data().size(); ++i) { std::unique_ptr row(new PDataRow()); row->CopyFrom(request->data(i)); diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index 1416a86e5e9c8b..4589b38cafce76 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") { } finally { GetDebugPoint().clearDebugPointsForAllBEs() } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (3, 3) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed") + sql """ set group_commit = async_mode """ + sql """ set enable_nereids_planner = false """ + sql """ insert into ${tableName} values (4, 4) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } } \ No newline at end of file