From a91b7f1ee9820c9e4080836dfd90afd6e50c7a69 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 15 Jul 2024 11:24:00 +0800 Subject: [PATCH] 1 --- be/src/pipeline/exec/result_sink_operator.cpp | 7 ++++++- be/src/service/arrow_flight/arrow_flight_batch_reader.cpp | 2 +- be/src/service/internal_service.cpp | 4 ++-- .../service/arrowflight/FlightSqlConnectProcessor.java | 8 +++++++- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 0495e48b7dc926..4ca4d3d421cbac 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -80,7 +80,12 @@ Status ResultSinkLocalState::open(RuntimeState* state) { case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); - state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); + if (state->query_options().enable_parallel_result_sink) { + state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); + } else { + state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), + arrow_schema); + } _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); break; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index d2b45a0b77c6ef..a07e479d759be7 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -43,7 +43,7 @@ arrow::Result> ArrowFlightBatchReader::C auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id); if (schema == nullptr) { ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( - "not found arrow flight schema, maybe query has been canceled, queryid: {}", + "Client not found arrow flight schema, maybe query has been canceled, queryid: {}", print_id(statement_->query_id)))); } std::shared_ptr result(new ArrowFlightBatchReader(statement_, schema)); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 8bf04ead03551a..6f7aaf5229492d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -833,9 +833,9 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( UniqueId(request->finst_id()).to_thrift()); if (schema == nullptr) { - LOG(INFO) << "not found arrow flight schema, maybe query has been canceled"; + LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled"; auto st = Status::NotFound( - "not found arrow flight schema, maybe query has been canceled"); + "FE not found arrow flight schema, maybe query has been canceled"); st.to_protobuf(result->mutable_status()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index f91d63ed90d01b..f83c67d2daf17c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -102,7 +102,13 @@ public void handleQuery(String query) throws ConnectionException { public Schema fetchArrowFlightSchema(int timeoutMs) { TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid = ctx.queryId(); + TUniqueId tid; + if (ctx.getSessionVariable().enableParallelResultSink()) { + tid = ctx.queryId(); + } else { + // only one instance + tid = ctx.getFinstId(); + } ArrayList resultOutputExprs = ctx.getResultOutputExprs(); Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); try {