From 8cdb3f28827d03d2bf9ee00034980f0284f665b0 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 18 Nov 2024 11:08:36 +0800 Subject: [PATCH 1/3] [fix](arrow-flight-sql) Fix FE not found arrow flight schema (#43960) Problem Summary: After query first phase `exec_plan_fragment`, FE will fetches arrow schema to BE, but BE will generate arrow schema when query second stage `ResultSinkLocalState::open`. Therefore, this pr is changed to generate arrow schema in the first phase `ResultSinkLocalState::init`. Fix: ``` rrmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled], error code: null, error msg: java.lang.RuntimeException: fetch arrow flight schema failed, finstId: 3573efbeb10c44a7-956531d8e15d1630, errmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled] at org.apache.doris.service.arrowflight.FlightSqlConnectProcessor.fetchArrowFlightSchema(FlightSqlConnectProcessor.java:126) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.executeQueryStatement(DorisFlightSqlProducer.java:229) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.getFlightInfoStatement(DorisFlightSqlProducer.java:260) ~[doris-fe.jar:1.2-SNAPSHOT] ``` --- be/src/pipeline/exec/result_sink_operator.cpp | 19 +++++++++++-------- be/src/service/internal_service.cpp | 1 + 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index d2dfa89cdd62f6..0ef1164b2e5459 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -71,6 +71,17 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state)); ((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this()); + + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + std::shared_ptr arrow_schema; + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); + } return Status::OK(); } @@ -79,10 +90,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } // create writer based on sink type switch (p._sink_type) { case TResultSinkType::MYSQL_PROTOCAL: { @@ -96,10 +103,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { break; } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { - std::shared_ptr arrow_schema; - RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile)); break; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a82ab9988b1993..eab0832d565e72 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -923,6 +923,7 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( UniqueId(request->finst_id()).to_thrift(), &schema); if (!st.ok()) { + LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } From 49ce106d80cb649f1a8f7d49721735a34c4ca371 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 19 Nov 2024 10:54:42 +0800 Subject: [PATCH 2/3] [fix](arrow-flight-sql) Fix Doris NULL column conversion to arrow batch (#43929) ### What problem does this PR solve? Problem Summary: The representation of NULL columns in Doris is special, which is `DataTypeNull`. `Uint8` uses `arrow::BooleanBuilder` when serializing into arrow batch, which does not match the expected `arrow::NullBuilder`. Fix: ``` *** Query id: fd32741526804c1e-bc016473fd8f3aa3 *** *** is nereids: 1 *** *** tablet id: 0 *** *** Aborted at 1731327262 (unix time) try "date -d @1731327262" if you are using GNU date *** *** Current BE git commitID: 653e315ba5 *** *** SIGSEGV address not mapped to object (@0x100000024) received by PID 1442863 (TID 1443456 OR 0x7f8b8cdea700) from PID 36; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/common/signal_handler.h:421 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /mnt/disk2/liyifan/doris/jdk-17.0.2/lib/server/libjvm.so 2# JVM_handle_linux_signal in /mnt/disk2/liyifan/doris/jdk-17.0.2/lib/server/libjvm.so 3# 0x00007F8CA1F38B50 in /lib64/libc.so.6 4# 0x000055FC45E5B2D3 in /mnt/disk2/liyifan/doris/doris_2.1/doris/output_run/be/lib/doris_be 5# arrow::BooleanBuilder::AppendValues(unsigned char const*, long, unsigned char const*) in /mnt/disk2/liyifan/doris/doris_2.1/doris/output_run/be/lib/doris_be 6# doris::vectorized::DataTypeNumberSerDe::write_column_to_arrow(doris::vectorized::IColumn const&, doris::vectorized::PODArray, 15ul, 16ul> const*, arrow::ArrayBuilder*, int, int, cctz::time_zone const&) const at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/vec/data_types/serde/data_type_number_serde.cpp:86 7# doris::FromBlockConverter::convert(std::shared_ptr*) at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/util/arrow/block_convertor.cpp:390 8# doris::convert_to_arrow_batch(doris::vectorized::Block const&, std::shared_ptr const&, arrow::MemoryPool*, std::shared_ptr*, cctz::time_zone const&) in /mnt/disk2/liyifan/doris/doris_2.1/doris/output_run/be/lib/doris_be 9# doris::vectorized::VArrowFlightResultWriter::write(doris::vectorized::Block&) at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/vec/sink/varrow_flight_result_writer.cpp:76 10# doris::vectorized::VResultSink::send(doris::RuntimeState*, doris::vectorized::Block*, bool) at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/vec/sink/vresult_sink.cpp:149 11# doris::PlanFragmentExecutor::open_vectorized_internal() at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/runtime/plan_fragment_executor.cpp:341 12# doris::PlanFragmentExecutor::open() at /mnt/disk2/liyifan/doris/doris_2.1/doris/be/src/runtime/plan_fragment_executor.cpp:273 ``` --- .../serde/data_type_number_serde.cpp | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index efa41e346bfa6e..f4fb6bbbb1f9cf 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -78,12 +78,21 @@ void DataTypeNumberSerDe::write_column_to_arrow(const IColumn& column, const auto arrow_null_map = revert_null_map(null_map, start, end); auto arrow_null_map_data = arrow_null_map.empty() ? nullptr : arrow_null_map.data(); if constexpr (std::is_same_v) { - ARROW_BUILDER_TYPE& builder = assert_cast(*array_builder); - checkArrowStatus( - builder.AppendValues(reinterpret_cast(col_data.data() + start), - end - start, - reinterpret_cast(arrow_null_map_data)), - column.get_name(), array_builder->type()->name()); + auto* null_builder = dynamic_cast(array_builder); + if (null_builder) { + for (size_t i = start; i < end; ++i) { + checkArrowStatus(null_builder->AppendNull(), column.get_name(), + null_builder->type()->name()); + } + } else { + ARROW_BUILDER_TYPE& builder = assert_cast(*array_builder); + checkArrowStatus( + builder.AppendValues(reinterpret_cast(col_data.data() + start), + end - start, + reinterpret_cast(arrow_null_map_data)), + column.get_name(), array_builder->type()->name()); + } + } else if constexpr (std::is_same_v) { auto& string_builder = assert_cast(*array_builder); for (size_t i = start; i < end; ++i) { From 0e5c2fd766e4f6729afb852dfe1b6641242fb294 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 19 Nov 2024 11:50:59 +0800 Subject: [PATCH 3/3] [fix](arrow-flight-sql) Fix conf `public_host` and `arrow_flight_sql_proxy_port` (#44177) ### What problem does this PR solve? Problem Summary: Rename `public_access_ip` to `public_host` and `public_access_port` to `arrow_flight_sql_proxy_port`, they do not have to be used together. --- be/src/common/config.cpp | 25 +++++++++++++++-- be/src/common/config.h | 28 +++++++++++++++---- be/src/service/internal_service.cpp | 8 ++++-- .../arrowflight/DorisFlightSqlProducer.java | 11 ++++++-- .../FlightSqlConnectProcessor.java | 8 ++++-- 5 files changed, 65 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d5b67c2c128639..c41f19a3e27ea6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060"); DEFINE_Int32(arrow_flight_sql_port, "-1"); -DEFINE_mString(public_access_ip, ""); -DEFINE_Int32(public_access_port, "-1"); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DEFINE_mString(public_host, ""); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DEFINE_Int32(arrow_flight_sql_proxy_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/common/config.h b/be/src/common/config.h index aca5b6b829af2c..7693af0f7aede3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port); // Default -1, do not start arrow flight sql server. DECLARE_Int32(arrow_flight_sql_port); -// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP. -// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result. -// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip. -DECLARE_mString(public_access_ip); -DECLARE_Int32(public_access_port); +// If the external client cannot directly access priority_networks, set public_host to be accessible +// to external client. +// There are usually two usage scenarios: +// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network. +// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be +// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx. +// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP, +// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP. +DECLARE_mString(public_host); + +// If the BE node is connected to the external network through a reverse proxy like Nginx +// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy +// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example: +// upstream arrowflight { +// server 10.16.10.8:8069; +// server 10.16.10.8:8068; +//} +// server { +// listen 8167 http2; +// listen [::]:8167 http2; +// server_name doris.arrowflight.com; +// } +DECLARE_Int32(arrow_flight_sql_proxy_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index eab0832d565e72..701fc6c018d70b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -932,9 +932,11 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro st = serialize_arrow_schema(&schema, &schema_str); if (st.ok()) { result->set_schema(std::move(schema_str)); - if (!config::public_access_ip.empty() && config::public_access_port != -1) { - result->set_be_arrow_flight_ip(config::public_access_ip); - result->set_be_arrow_flight_port(config::public_access_port); + if (!config::public_host.empty()) { + result->set_be_arrow_flight_ip(config::public_host); + } + if (config::arrow_flight_sql_proxy_port != -1) { + result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port); } } st.to_protobuf(result->mutable_status()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 6f45f3faac7b41..758f30469bf008 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -249,8 +249,15 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. // If it is different from the Doris BE node randomly routed by nginx, // data forwarding needs to be done inside the Doris BE node. - location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname, - flightSQLConnectProcessor.getPublicAccessAddr().port); + if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) { + location = Location.forGrpcInsecure( + flightSQLConnectProcessor.getPublicAccessAddr().hostname, + flightSQLConnectProcessor.getPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure( + flightSQLConnectProcessor.getPublicAccessAddr().hostname, + connectContext.getResultFlightServerAddr().port); + } } else { location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, connectContext.getResultFlightServerAddr().port); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 6724065f99a883..20b377eb5c370d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -131,9 +131,11 @@ public Schema fetchArrowFlightSchema(int timeoutMs) { throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", DebugUtil.printId(tid), resultStatus)); } - if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) { - publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8(); - publicAccessAddr.port = pResult.getBeArrowFlightPort(); + if (pResult.hasBeArrowFlightIp()) { + publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); + } + if (pResult.hasBeArrowFlightPort()) { + publicAccessAddr.setPort(pResult.getBeArrowFlightPort()); } if (pResult.hasSchema() && pResult.getSchema().size() > 0) { RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);