From 22fb9da16a3efc8a0ff259faeb1629c472c51d25 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 25 Jun 2024 20:49:52 +0800 Subject: [PATCH] 1 --- be/src/pipeline/exec/result_sink_operator.cpp | 3 +-- be/src/runtime/buffer_control_block.cpp | 13 ++++++----- be/src/runtime/buffer_control_block.h | 4 ++++ .../arrowflight/DorisFlightSqlProducer.java | 2 +- .../FlightSqlConnectProcessor.java | 22 +++++++++---------- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 378fea18eea366..0495e48b7dc926 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) { case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); - state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(), - arrow_schema); + state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema)); break; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a1a83b22840b2b..845afb9a84b85c 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -151,10 +151,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, int num_rows = result->num_rows(); - if (_is_cancelled) { - return Status::Cancelled("Cancelled"); - } - // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); @@ -162,6 +158,7 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, _instance_rows_in_queue.emplace_back(); _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; + _arrow_data_arrival.notify_one(); _update_dependency(); return Status::OK(); } @@ -212,6 +209,10 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* return Status::Cancelled("Cancelled"); } + while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + } + if (_is_cancelled) { return Status::Cancelled("Cancelled"); } @@ -234,7 +235,7 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* _update_dependency(); return Status::OK(); } - return Status::InternalError("Abnormal Ending"); + return Status::InternalError("Get Arrow Batch Abnormal Ending"); } Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { @@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { _is_close = true; _status = exec_status; + _arrow_data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { @@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) { void BufferControlBlock::cancel() { std::unique_lock l(_lock); _is_cancelled = true; + _arrow_data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 12cbc72ff52071..d8bb6e0f5061f7 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -124,6 +124,10 @@ class BufferControlBlock { // protects all subsequent data in this block std::mutex _lock; + // get arrow flight result is a sync method, need wait for data ready and return result. + // TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data. + std::condition_variable _arrow_data_arrival; + std::deque _waiting_rpc; // only used for FE using return rows to check limit diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 2c7aaae4f2a475..af6d85c954e4a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -225,7 +225,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con } else { // Now only query stmt will pull results from BE. final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(connectContext.getFinstId()) + ":" + query); + DebugUtil.printId(connectContext.queryId()) + ":" + query); Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); if (schema == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index a4aa5a88c8f624..f91d63ed90d01b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -102,13 +102,13 @@ public void handleQuery(String query) throws ConnectionException { public Schema fetchArrowFlightSchema(int timeoutMs) { TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid = ctx.getFinstId(); + TUniqueId tid = ctx.queryId(); ArrayList resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); try { InternalService.PFetchArrowFlightSchemaRequest request = InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(finstId) + .setFinstId(queryId) .build(); Future future @@ -116,12 +116,12 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { InternalService.PFetchArrowFlightSchemaResult pResult; pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); } Status resultStatus = new Status(pResult.getStatus()); if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", DebugUtil.printId(tid), resultStatus.toString())); } if (pResult.hasBeArrowFlightIp()) { @@ -138,7 +138,7 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { List fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != resultOutputExprs.size()) { throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + "Schema size %s' is not equal to arrow field size %s, queryId: %s.", fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); } return root.getSchema(); @@ -146,24 +146,24 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { throw new RuntimeException("Read Arrow Flight Schema failed.", e); } } else { - throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } } catch (RpcException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (InterruptedException e) { throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (ExecutionException e) { throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, finstId: %s,backend: %s", + "arrow flight schema future get execution exception, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } catch (TimeoutException e) { throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, finstId: %s,backend: %s", + "arrow flight schema fetch timeout, queryId: %s,backend: %s", DebugUtil.printId(tid), address), e); } }