From aaadb409f80ed6f0bfa7d34d27040d97484bb6aa Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 15 Sep 2020 14:42:00 -0400
Subject: [PATCH] ARROW-10013: [FlightRPC][C++] fix setting generic client
options
---
cpp/src/arrow/flight/client.cc | 16 ++++++++++++----
cpp/src/arrow/flight/flight_test.cc | 2 +-
python/pyarrow/_flight.pyx | 3 ++-
python/pyarrow/tests/test_flight.py | 2 ++
4 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 8181ae757d4..b25b13edf39 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
#include
#ifdef GRPCPP_PP_INCLUDE
@@ -867,13 +868,17 @@ class FlightClient::FlightClientImpl {
}
grpc::ChannelArguments args;
+ // We can't set the same config value twice, so for values where
+ // we want to set defaults, keep them in a map and update them;
+ // then update them all at once
+ std::unordered_map default_args;
// Try to reconnect quickly at first, in case the server is still starting up
- args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
+ default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100;
// Receive messages of any size
- args.SetMaxReceiveMessageSize(-1);
+ default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1;
// Setting this arg enables each client to open it's own TCP connection to server,
// not sharing one single connection, which becomes bottleneck under high load.
- args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
+ default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1;
if (options.override_hostname != "") {
args.SetSslTargetNameOverride(options.override_hostname);
@@ -882,12 +887,15 @@ class FlightClient::FlightClientImpl {
// Allow setting generic gRPC options.
for (const auto& arg : options.generic_options) {
if (util::holds_alternative(arg.second)) {
- args.SetInt(arg.first, util::get(arg.second));
+ default_args[arg.first] = util::get(arg.second);
} else if (util::holds_alternative(arg.second)) {
args.SetString(arg.first, util::get(arg.second));
}
// Otherwise unimplemented
}
+ for (const auto& pair : default_args) {
+ args.SetInt(pair.first, pair.second);
+ }
std::vector>
interceptors;
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index cb88d857c95..8726a554a1c 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -1417,7 +1417,7 @@ TEST_F(TestFlightClient, GenericOptions) {
std::unique_ptr client;
auto options = FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
- options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 32);
+ options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 4);
Location location;
ASSERT_OK(Location::ForGrpcTcp("localhost", server_->port(), &location));
ASSERT_OK(FlightClient::Connect(location, options, &client));
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index b7459d21da1..484fbb2a4e4 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -1435,7 +1435,8 @@ cdef class ServerCallContext(_Weakrefable):
def peer(self):
"""Get the address of the peer."""
- return frombytes(self.context.peer())
+ # Set safe=True as gRPC on Windows sometimes gives garbage bytes
+ return frombytes(self.context.peer(), safe=True)
def get_middleware(self, key):
"""
diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py
index 50e993dce87..5d0e1c05bea 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -952,6 +952,8 @@ def test_http_basic_unauth():
list(client.do_action(action))
+@pytest.mark.skipif(os.name == 'nt',
+ reason="ARROW-10013: gRPC on Windows corrupts peer()")
def test_http_basic_auth():
"""Test a Python implementation of HTTP basic authentication."""
with EchoStreamFlightServer(auth_handler=basic_auth_handler) as server: