Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <mutex>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>

#ifdef GRPCPP_PP_INCLUDE
Expand Down Expand Up @@ -867,13 +868,17 @@ class FlightClient::FlightClientImpl {
}

grpc::ChannelArguments args;
// We can't set the same config value twice, so for values where
// we want to set defaults, keep them in a map and update them;
// then update them all at once
std::unordered_map<std::string, int> default_args;
// Try to reconnect quickly at first, in case the server is still starting up
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100;
// Receive messages of any size
args.SetMaxReceiveMessageSize(-1);
default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1;
// Setting this arg enables each client to open it's own TCP connection to server,
// not sharing one single connection, which becomes bottleneck under high load.
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1;

if (options.override_hostname != "") {
args.SetSslTargetNameOverride(options.override_hostname);
Expand All @@ -882,12 +887,15 @@ class FlightClient::FlightClientImpl {
// Allow setting generic gRPC options.
for (const auto& arg : options.generic_options) {
if (util::holds_alternative<int>(arg.second)) {
args.SetInt(arg.first, util::get<int>(arg.second));
default_args[arg.first] = util::get<int>(arg.second);
} else if (util::holds_alternative<std::string>(arg.second)) {
args.SetString(arg.first, util::get<std::string>(arg.second));
}
// Otherwise unimplemented
}
for (const auto& pair : default_args) {
args.SetInt(pair.first, pair.second);
}

std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptors;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ TEST_F(TestFlightClient, GenericOptions) {
std::unique_ptr<FlightClient> client;
auto options = FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 32);
options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 4);
Location location;
ASSERT_OK(Location::ForGrpcTcp("localhost", server_->port(), &location));
ASSERT_OK(FlightClient::Connect(location, options, &client));
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,8 @@ cdef class ServerCallContext(_Weakrefable):

def peer(self):
"""Get the address of the peer."""
return frombytes(self.context.peer())
# Set safe=True as gRPC on Windows sometimes gives garbage bytes
return frombytes(self.context.peer(), safe=True)

def get_middleware(self, key):
"""
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/tests/test_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,8 @@ def test_http_basic_unauth():
list(client.do_action(action))


@pytest.mark.skipif(os.name == 'nt',
reason="ARROW-10013: gRPC on Windows corrupts peer()")
def test_http_basic_auth():
"""Test a Python implementation of HTTP basic authentication."""
with EchoStreamFlightServer(auth_handler=basic_auth_handler) as server:
Expand Down