Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9fd3f58
Extract integration test json functions to the lib
carols10cents Nov 5, 2020
dd142d1
ARROW-8853: [Rust] [Integration Testing] Enable Flight tests
carols10cents Oct 29, 2020
b078a65
Don't send ACK for initial DoPut request from server
carols10cents Nov 18, 2020
cb54a90
Don't send or check for app_metadata in the DoPut request
carols10cents Nov 19, 2020
bfdfc78
Remove tokio spawn; a new thread isn't actually needed here
carols10cents Nov 19, 2020
355008e
Preload first batch in channel before starting DoPut request
carols10cents Nov 19, 2020
7c182f9
rando debugging
shepmaster Nov 20, 2020
908c0a5
tracing output
shepmaster Nov 20, 2020
12d3138
[upstream anyway] more generic
shepmaster Nov 20, 2020
aeb8539
[supahax] it gets to the next failure
shepmaster Nov 20, 2020
0fb62e5
Redo sending batches in client, needs cleaned up
carols10cents Nov 30, 2020
b5b0414
cargo fmt
carols10cents Nov 30, 2020
6112e8a
Progress on understanding flight
carols10cents Nov 30, 2020
4e79769
more progress
carols10cents Dec 2, 2020
a3894c0
[Rust] Only return dict_id/dict_is_ordered values for Dictionary types
carols10cents Nov 6, 2020
83e2dcd
Actually really really read dictionaries
carols10cents Dec 2, 2020
e8bcb79
set initial len of dictionaries
carols10cents Dec 2, 2020
1fde810
Extract a function for Jake's supahax
carols10cents Dec 2, 2020
2f97515
welp nope
carols10cents Dec 2, 2020
3af0c24
i know nothing
carols10cents Dec 2, 2020
3cff161
Extract generating schema EncodedData to a new struct's method
carols10cents Dec 3, 2020
462d330
Move record_batch_to_bytes to the new object
carols10cents Dec 3, 2020
7366dd1
Extract dictionary_batch_to_bytes to the new struct
carols10cents Dec 3, 2020
3893361
Move EncodedData generation out of write_message
carols10cents Dec 3, 2020
51be846
Remove the now-unused intermediate Message enum
carols10cents Dec 3, 2020
71b9d8c
Extract a DictionaryTracker for logic of whether replacement is an error
carols10cents Dec 3, 2020
ab7ccc3
Extract shared dictionary code to the new struct
carols10cents Dec 3, 2020
dca9d09
Always create FlightData for dictionaries when given a RecordBatch
carols10cents Dec 3, 2020
6eb0614
cleaner
shepmaster Dec 4, 2020
5d3b333
empty fix
shepmaster Dec 4, 2020
0b7b241
kindawork
shepmaster Dec 4, 2020
d436206
[upstream] print both values
shepmaster Dec 4, 2020
5fed5fd
Merge branch 'pr/8826' into flight-integration-tests
shepmaster Dec 4, 2020
87b3678
reduce logging
shepmaster Dec 4, 2020
dd91f4f
preload the authentication handshake
shepmaster Dec 4, 2020
987fd60
Actually use the dictionaries instead of throwing them away :P
carols10cents Dec 4, 2020
2574ce5
Clear up what's being returned from converting a record batch to flig…
carols10cents Dec 4, 2020
46f76d7
send_all
shepmaster Dec 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,25 +498,38 @@ class GrpcStreamReader : public FlightStreamReader {
app_metadata_(nullptr) {}

Status EnsureDataStarted() {
std::cout << "Here i am in GrpcStreamReader EnsureDataStarted" << std::endl;

if (!batch_reader_) {
std::cout << "yes batch_reader_" << std::endl;

bool skipped_to_data = false;
{
auto guard = TakeGuard();
skipped_to_data = peekable_reader_->SkipToData();
std::cout << "TakeGuard, SkipToData" << std::endl;
}
// peek() until we find the first data message; discard metadata
if (!skipped_to_data) {
std::cout << "!skipped_to_data" << std::endl;

return OverrideWithServerError(MakeFlightError(
FlightStatusCode::Internal, "Server never sent a data message"));
}

auto message_reader =
std::unique_ptr<ipc::MessageReader>(new GrpcIpcMessageReader<Reader>(
rpc_, read_mutex_, stream_, peekable_reader_, &app_metadata_));
std::cout << "yes message_reader" << std::endl;

auto result =
ipc::RecordBatchStreamReader::Open(std::move(message_reader), options_);
std::cout << "yes result" << std::endl;

RETURN_NOT_OK(OverrideWithServerError(std::move(result).Value(&batch_reader_)));
}
std::cout << "the end" << std::endl;

return Status::OK();
}
arrow::Result<std::shared_ptr<Schema>> GetSchema() override {
Expand Down Expand Up @@ -1141,6 +1154,8 @@ class FlightClient::FlightClientImpl {
*out = std::unique_ptr<StreamReader>(
new StreamReader(rpc, nullptr, options.read_options, finishable_stream));
// Eagerly read the schema
std::cout << "Here i am in DoGet" << std::endl;

return static_cast<StreamReader*>(out->get())->EnsureDataStarted();
}

Expand All @@ -1151,6 +1166,8 @@ class FlightClient::FlightClientImpl {
using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>;
using StreamWriter = GrpcStreamWriter<pb::PutResult, pb::PutResult>;

std::cerr << "DoPut called" << std::endl;

auto rpc = std::make_shared<ClientRpc>(options);
RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
std::shared_ptr<GrpcStream> stream = stub_->DoPut(&rpc->context);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ class FlightServiceImpl : public FlightService::Service {

grpc::Status DoGet(ServerContext* context, const pb::Ticket* request,
ServerWriter<pb::FlightData>* writer) {
std::cout << "in base DoGet" << std::endl;
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoGet, context, flight_context));

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/flight/test_integration_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ Status ConsumeFlightLocation(
std::unique_ptr<FlightStreamReader> stream;
RETURN_NOT_OK(read_client->DoGet(ticket, &stream));

std::cout << "Here i am in ConsumeFlightLocation" << std::endl;

int counter = 0;
const int expected = static_cast<int>(retrieved_data.size());
for (const auto& original_batch : retrieved_data) {
FlightStreamChunk chunk;
RETURN_NOT_OK(stream->Next(&chunk));
std::cout << "The counter is " << counter << std::endl;

if (chunk.data == nullptr) {
return Status::Invalid("Got fewer batches than expected, received so far: ",
counter, " expected ", expected);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/test_integration_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ class FlightIntegrationTestServer : public FlightServerBase {

Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* data_stream) override {
std::cout << "In Server DoGet" << std::endl;
auto data = uploaded_chunks.find(request.ticket);
if (data == uploaded_chunks.end()) {
std::cout << "Could not find flight" << std::endl;

return Status::KeyError("Could not find flight.", request.ticket);
}
auto flight = data->second;
Expand All @@ -121,6 +124,8 @@ class FlightIntegrationTestServer : public FlightServerBase {
new NumberingStream(std::unique_ptr<FlightDataStream>(new RecordBatchStream(
std::shared_ptr<RecordBatchReader>(new RecordBatchListReader(flight))))));

std::cout << "Returning OK" << std::endl;

return Status::OK();
}

Expand Down
17 changes: 16 additions & 1 deletion cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,29 @@ Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor,
NumberingStream::NumberingStream(std::unique_ptr<FlightDataStream> stream)
: counter_(0), stream_(std::move(stream)) {}

std::shared_ptr<Schema> NumberingStream::schema() { return stream_->schema(); }
std::shared_ptr<Schema> NumberingStream::schema() {
std::cout << "In NumberingStream::schema" << std::endl;

return stream_->schema();
}

Status NumberingStream::GetSchemaPayload(FlightPayload* payload) {
std::cout << "In NumberingStream::GetSchemaPayload" << std::endl;

return stream_->GetSchemaPayload(payload);
}

Status NumberingStream::Next(FlightPayload* payload) {
std::cout << "In NumberingStream::Next " << counter_ << std::endl;

RETURN_NOT_OK(stream_->Next(payload));
if (payload) {
std::cout << "yes payload" << std::endl;
if (payload->ipc_message.type != ipc::MessageType::RECORD_BATCH) {
std::cout << "no record batch :(" << std::endl;

}
}
if (payload && payload->ipc_message.type == ipc::MessageType::RECORD_BATCH) {
payload->app_metadata = Buffer::FromString(std::to_string(counter_));
counter_++;
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <utility>
#include <vector>
#include <iostream>

#include "arrow/buffer.h"
#include "arrow/device.h"
Expand Down Expand Up @@ -469,6 +470,8 @@ class MessageDecoder::MessageDecoderImpl {
metadata_(nullptr) {}

Status ConsumeData(const uint8_t* data, int64_t size) {
// std::cerr << "ConsumeData / next_required_size_ " << next_required_size_ << std::endl;

if (buffered_size_ == 0) {
while (size > 0 && size >= next_required_size_) {
auto used_size = next_required_size_;
Expand Down Expand Up @@ -505,6 +508,7 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeBuffer(std::shared_ptr<Buffer> buffer) {
// std::cerr << "ConsumeBuffer / next_required_size_ " << next_required_size_ << std::endl;
if (buffered_size_ == 0) {
while (buffer->size() >= next_required_size_) {
auto used_size = next_required_size_;
Expand Down Expand Up @@ -598,22 +602,26 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeInitial(int32_t continuation) {
// std::cerr << "ConsumeInitial / continuation " << continuation << std::endl;
if (continuation == internal::kIpcContinuationToken) {
state_ = State::METADATA_LENGTH;
next_required_size_ = kMessageDecoderNextRequiredSizeMetadataLength;
// std::cerr << "ConsumeInitial / A / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnMetadataLength());
// Valid IPC message, read the message length now
return Status::OK();
} else if (continuation == 0) {
state_ = State::EOS;
next_required_size_ = 0;
// std::cerr << "ConsumeInitial / B / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnEOS());
return Status::OK();
} else if (continuation > 0) {
state_ = State::METADATA;
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
next_required_size_ = continuation;
// std::cerr << "ConsumeInitial / C / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnMetadata());
return Status::OK();
} else {
Expand Down Expand Up @@ -641,11 +649,13 @@ class MessageDecoder::MessageDecoderImpl {
if (metadata_length == 0) {
state_ = State::EOS;
next_required_size_ = 0;
// std::cerr << "ConsumeMetadataLength / A /next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnEOS());
return Status::OK();
} else if (metadata_length > 0) {
state_ = State::METADATA;
next_required_size_ = metadata_length;
// std::cerr << "ConsumeMetadataLength / B / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnMetadata());
return Status::OK();
} else {
Expand All @@ -664,6 +674,8 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeMetadataChunks() {
// std::cerr << "ConsumeMetadataChunks / next_required_size_ " << next_required_size_ << std::endl;

if (chunks_[0]->size() >= next_required_size_) {
if (chunks_[0]->size() == next_required_size_) {
if (chunks_[0]->is_cpu()) {
Expand Down Expand Up @@ -698,6 +710,7 @@ class MessageDecoder::MessageDecoderImpl {

state_ = State::BODY;
next_required_size_ = body_length;
// std::cerr << "ConsumeMetadata / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnBody());
if (next_required_size_ == 0) {
ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_));
Expand All @@ -713,6 +726,8 @@ class MessageDecoder::MessageDecoderImpl {
}

Status ConsumeBodyChunks() {
// std::cerr << "ConsumeBodyChunks / next_required_size_ " << next_required_size_ << std::endl;

if (chunks_[0]->size() >= next_required_size_) {
auto used_size = next_required_size_;
if (chunks_[0]->size() == next_required_size_) {
Expand Down Expand Up @@ -740,6 +755,7 @@ class MessageDecoder::MessageDecoderImpl {
RETURN_NOT_OK(listener_->OnMessageDecoded(std::move(message)));
state_ = State::INITIAL;
next_required_size_ = kMessageDecoderNextRequiredSizeInitial;
// std::cerr << "ConsumeBody / next_required_size_ = " << next_required_size_ << std::endl;
RETURN_NOT_OK(listener_->OnInitial());
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
}

if (message->type() != MessageType::DICTIONARY_BATCH) {
return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
") of dictionaries at the start of the stream");
return Status::Invalid("IPC stream had (", i, ") dictionaries at the start of the stream, but (", num_dicts, ") were expected");
}
RETURN_NOT_OK(ReadDictionary(*message));
}
Expand Down
86 changes: 48 additions & 38 deletions dev/archery/archery/integration/tester_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import contextlib
import os
import subprocess

from .tester import Tester
from .util import run_cmd, ARROW_ROOT_DEFAULT, log
Expand All @@ -24,8 +26,8 @@
class RustTester(Tester):
PRODUCER = True
CONSUMER = True
# FLIGHT_SERVER = True
# FLIGHT_CLIENT = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True

EXE_PATH = os.path.join(ARROW_ROOT_DEFAULT, 'rust/target/debug')

Expand All @@ -34,11 +36,11 @@ class RustTester(Tester):
STREAM_TO_FILE = os.path.join(EXE_PATH, 'arrow-stream-to-file')
FILE_TO_STREAM = os.path.join(EXE_PATH, 'arrow-file-to-stream')

# FLIGHT_SERVER_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-server')]
# FLIGHT_CLIENT_CMD = [
# os.path.join(EXE_PATH, 'flight-test-integration-client'),
# "-host", "localhost"]
FLIGHT_SERVER_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-server')]
FLIGHT_CLIENT_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-client'),
"--host", "localhost"]

name = 'Rust'

Expand Down Expand Up @@ -72,34 +74,42 @@ def file_to_stream(self, file_path, stream_path):
cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
self.run_shell_command(cmd)

# @contextlib.contextmanager
# def flight_server(self):
# cmd = self.FLIGHT_SERVER_CMD + ['-port=0']
# if self.debug:
# log(' '.join(cmd))
# server = subprocess.Popen(cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# try:
# output = server.stdout.readline().decode()
# if not output.startswith("Server listening on localhost:"):
# server.kill()
# out, err = server.communicate()
# raise RuntimeError(
# "Flight-C++ server did not start properly, "
# "stdout:\n{}\n\nstderr:\n{}\n"
# .format(output + out.decode(), err.decode()))
# port = int(output.split(":")[1])
# yield port
# finally:
# server.kill()
# server.wait(5)

# def flight_request(self, port, json_path):
# cmd = self.FLIGHT_CLIENT_CMD + [
# '-port=' + str(port),
# '-path=' + json_path,
# ]
# if self.debug:
# log(' '.join(cmd))
# run_cmd(cmd)
@contextlib.contextmanager
def flight_server(self, scenario_name=None):
cmd = self.FLIGHT_SERVER_CMD + ['--port=0']
if scenario_name:
cmd = cmd + ["--scenario", scenario_name]
if self.debug:
log(' '.join(cmd))
server = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost:"):
server.kill()
out, err = server.communicate()
raise RuntimeError(
"Flight-Rust server did not start properly, "
"stdout:\n{}\n\nstderr:\n{}\n"
.format(output + out.decode(), err.decode()))
port = int(output.split(":")[1])
yield port
finally:
server.kill()
server.wait(5)

def flight_request(self, port, json_path=None, scenario_name=None):
cmd = self.FLIGHT_CLIENT_CMD + [
'--port=' + str(port),
]
if json_path:
cmd.extend(('--path', json_path))
elif scenario_name:
cmd.extend(('--scenario', scenario_name))
else:
raise TypeError("Must provide one of json_path or scenario_name")

if self.debug:
log(' '.join(cmd))
run_cmd(cmd)
Loading