Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
arrow_schema);
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
break;
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,14 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,

int num_rows = result->num_rows();

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}

// TODO: merge RocordBatch, ToStructArray -> Make again

_arrow_flight_batch_queue.push_back(std::move(result));
_buffer_rows += num_rows;
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
_arrow_data_arrival.notify_one();
_update_dependency();
return Status::OK();
}
Expand Down Expand Up @@ -212,6 +209,10 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
return Status::Cancelled("Cancelled");
}

while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
_arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
}

if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
Expand All @@ -234,7 +235,7 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
_update_dependency();
return Status::OK();
}
return Status::InternalError("Abnormal Ending");
return Status::InternalError("Get Arrow Batch Abnormal Ending");
}

Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
Expand All @@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {

_is_close = true;
_status = exec_status;
_arrow_data_arrival.notify_all();

if (!_waiting_rpc.empty()) {
if (_status.ok()) {
Expand All @@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
void BufferControlBlock::cancel() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class BufferControlBlock {
// protects all subsequent data in this block
std::mutex _lock;

// get arrow flight result is a sync method, need wait for data ready and return result.
// TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
std::condition_variable _arrow_data_arrival;

std::deque<GetResultBatchCtx*> _waiting_rpc;

// only used for FE using return rows to check limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
} else {
// Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
DebugUtil.printId(connectContext.queryId()) + ":" + query);
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,26 @@ public void handleQuery(String query) throws ConnectionException {

public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = ctx.getResultInternalServiceAddr();
TUniqueId tid = ctx.getFinstId();
TUniqueId tid = ctx.queryId();
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
.setFinstId(finstId)
.setFinstId(queryId)
.build();

Future<InternalService.PFetchArrowFlightSchemaResult> future
= BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s",
throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s",
DebugUtil.printId(tid)));
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus.toString()));
}
if (pResult.hasBeArrowFlightIp()) {
Expand All @@ -138,32 +138,32 @@ public Schema fetchArrowFlightSchema(int timeoutMs) {
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(String.format(
"Schema size %s' is not equal to arrow field size %s, finstId: %s.",
"Schema size %s' is not equal to arrow field size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
}
return root.getSchema();
} catch (Exception e) {
throw new RuntimeException("Read Arrow Flight Schema failed.", e);
}
} else {
throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s",
throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s",
DebugUtil.printId(tid)));
}
} catch (RpcException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s",
"arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get interrupted exception, finstId: %s,backend: %s",
"arrow flight schema future get interrupted exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get execution exception, finstId: %s,backend: %s",
"arrow flight schema future get execution exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch timeout, finstId: %s,backend: %s",
"arrow flight schema fetch timeout, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
}
}
Expand Down