diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 8181ae757d4..b25b13edf39 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #ifdef GRPCPP_PP_INCLUDE @@ -867,13 +868,17 @@ class FlightClient::FlightClientImpl { } grpc::ChannelArguments args; + // We can't set the same config value twice, so for values where + // we want to set defaults, keep them in a map and update them; + // then update them all at once + std::unordered_map default_args; // Try to reconnect quickly at first, in case the server is still starting up - args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100); + default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100; // Receive messages of any size - args.SetMaxReceiveMessageSize(-1); + default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1; // Setting this arg enables each client to open it's own TCP connection to server, // not sharing one single connection, which becomes bottleneck under high load. - args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1); + default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1; if (options.override_hostname != "") { args.SetSslTargetNameOverride(options.override_hostname); @@ -882,12 +887,15 @@ class FlightClient::FlightClientImpl { // Allow setting generic gRPC options. for (const auto& arg : options.generic_options) { if (util::holds_alternative(arg.second)) { - args.SetInt(arg.first, util::get(arg.second)); + default_args[arg.first] = util::get(arg.second); } else if (util::holds_alternative(arg.second)) { args.SetString(arg.first, util::get(arg.second)); } // Otherwise unimplemented } + for (const auto& pair : default_args) { + args.SetInt(pair.first, pair.second); + } std::vector> interceptors; diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc index cb88d857c95..8726a554a1c 100644 --- a/cpp/src/arrow/flight/flight_test.cc +++ b/cpp/src/arrow/flight/flight_test.cc @@ -1417,7 +1417,7 @@ TEST_F(TestFlightClient, GenericOptions) { std::unique_ptr client; auto options = FlightClientOptions::Defaults(); // Set a very low limit at the gRPC layer to fail all calls - options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 32); + options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 4); Location location; ASSERT_OK(Location::ForGrpcTcp("localhost", server_->port(), &location)); ASSERT_OK(FlightClient::Connect(location, options, &client)); diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index b7459d21da1..484fbb2a4e4 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -1435,7 +1435,8 @@ cdef class ServerCallContext(_Weakrefable): def peer(self): """Get the address of the peer.""" - return frombytes(self.context.peer()) + # Set safe=True as gRPC on Windows sometimes gives garbage bytes + return frombytes(self.context.peer(), safe=True) def get_middleware(self, key): """ diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index 50e993dce87..5d0e1c05bea 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -952,6 +952,8 @@ def test_http_basic_unauth(): list(client.do_action(action)) +@pytest.mark.skipif(os.name == 'nt', + reason="ARROW-10013: gRPC on Windows corrupts peer()") def test_http_basic_auth(): """Test a Python implementation of HTTP basic authentication.""" with EchoStreamFlightServer(auth_handler=basic_auth_handler) as server: