Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cpp/src/arrow/array/array_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,29 +222,31 @@ bool Array::ApproxEquals(const std::shared_ptr<Array>& arr,
}

bool Array::RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx,
int64_t other_start_idx) const {
return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx);
int64_t other_start_idx, const EqualOptions& opts) const {
return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts);
}

bool Array::RangeEquals(const std::shared_ptr<Array>& other, int64_t start_idx,
int64_t end_idx, int64_t other_start_idx) const {
int64_t end_idx, int64_t other_start_idx,
const EqualOptions& opts) const {
if (!other) {
return false;
}
return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx);
return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts);
}

bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx,
const Array& other) const {
return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx);
const Array& other, const EqualOptions& opts) const {
return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx, opts);
}

bool Array::RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx,
const std::shared_ptr<Array>& other) const {
const std::shared_ptr<Array>& other,
const EqualOptions& opts) const {
if (!other) {
return false;
}
return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx);
return ArrayRangeEquals(*this, *other, start_idx, end_idx, other_start_idx, opts);
}

std::shared_ptr<Array> Array::Slice(int64_t offset, int64_t length) const {
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/array/array_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ class ARROW_EXPORT Array {
/// Compare if the range of slots specified are equal for the given array and
/// this array. end_idx exclusive. This methods does not bounds check.
bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx,
const Array& other) const;
const Array& other,
const EqualOptions& = EqualOptions::Defaults()) const;
bool RangeEquals(int64_t start_idx, int64_t end_idx, int64_t other_start_idx,
const std::shared_ptr<Array>& other) const;
const std::shared_ptr<Array>& other,
const EqualOptions& = EqualOptions::Defaults()) const;
bool RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx,
int64_t other_start_idx) const;
int64_t other_start_idx,
const EqualOptions& = EqualOptions::Defaults()) const;
bool RangeEquals(const std::shared_ptr<Array>& other, int64_t start_idx,
int64_t end_idx, int64_t other_start_idx) const;
int64_t end_idx, int64_t other_start_idx,
const EqualOptions& = EqualOptions::Defaults()) const;

Status Accept(ArrayVisitor* visitor) const;

Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -664,12 +664,14 @@ class GrpcStreamWriter : public FlightStreamWriter {
}
return Status::OK();
}

Status WriteWithMetadata(const RecordBatch& batch,
std::shared_ptr<Buffer> app_metadata) override {
RETURN_NOT_OK(CheckStarted());
app_metadata_ = app_metadata;
return batch_writer_->WriteRecordBatch(batch);
}

Status DoneWriting() override {
// Do not CheckStarted - DoneWriting applies to data and metadata
if (batch_writer_) {
Expand All @@ -683,6 +685,7 @@ class GrpcStreamWriter : public FlightStreamWriter {
}
return writer_->DoneWriting();
}

Status Close() override {
// Do not CheckStarted - Close applies to data and metadata
if (batch_writer_ && !writer_closed_) {
Expand All @@ -698,6 +701,11 @@ class GrpcStreamWriter : public FlightStreamWriter {
return writer_->Finish(Status::OK());
}

ipc::WriteStats stats() const override {
ARROW_CHECK_NE(batch_writer_, nullptr);
return batch_writer_->stats();
}

private:
friend class DoPutPayloadWriter<ProtoReadT, FlightReadT>;
std::shared_ptr<Buffer> app_metadata_;
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,20 +314,25 @@ class DoExchangeMessageWriter : public FlightMessageWriter {
payload.app_metadata = app_metadata;
}
RETURN_NOT_OK(ipc::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message));
return WritePayload(payload);
RETURN_NOT_OK(WritePayload(payload));
++stats_.num_record_batches;
return Status::OK();
}

Status Close() override {
// It's fine to Close() without writing data
return Status::OK();
}

ipc::WriteStats stats() const override { return stats_; }

private:
Status WritePayload(const FlightPayload& payload) {
if (!internal::WritePayload(payload, stream_)) {
// gRPC doesn't give us any way to find what the error was (if any).
return Status::IOError("Could not write payload to stream");
}
++stats_.num_messages;
return Status::OK();
}

Expand All @@ -350,13 +355,15 @@ class DoExchangeMessageWriter : public FlightMessageWriter {
RETURN_NOT_OK(ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options_,
&payload.ipc_message));
RETURN_NOT_OK(WritePayload(payload));
++stats_.num_dictionary_batches;
}
return Status::OK();
}

grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream_;
::arrow::ipc::IpcWriteOptions ipc_options_;
ipc::DictionaryFieldMapper mapper_;
ipc::WriteStats stats_;
bool started_ = false;
bool dictionaries_written_ = false;
};
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ struct ARROW_EXPORT IpcWriteOptions {
/// like compression
bool use_threads = true;

/// \brief Whether to emit dictionary deltas
///
/// If false, a changed dictionary for a given field will emit a full
/// dictionary replacement.
/// If true, a changed dictionary will be compared against the previous
/// version. If possible, a dictionary delta will be omitted, otherwise
/// a full dictionary replacement.
///
/// Default is false to maximize stream compatibility.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might help to be explicit to say that otherwise the order of dictionaries may be altered by a passage from sender to receiver (I assume that's what you mean here by "stream compatibility" but others might be scratching their heads)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I simply meant that it's off for compatibility with implementations that don't support delta dictionaries.

///
/// Also, note that if a changed dictionary is a nested dictionary,
/// then a delta is never emitted, for compatibility with the read path.
bool emit_dictionary_deltas = false;

/// \brief Format version to use for IPC messages and their metadata.
///
/// Presently using V5 version (readable by 1.0.0 and later).
Expand Down
Loading