Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ arrow_dir=${1}
source_dir=${1}/cpp
build_dir=${2}/cpp
gold_dir_0_14_1=$arrow_dir/testing/data/arrow-ipc-stream/integration/0.14.1
gold_dir_0_17_1=$arrow_dir/testing/data/arrow-ipc-stream/integration/0.17.1

pip install -e $arrow_dir/dev/archery

archery integration --with-all --run-flight --gold-dirs=$gold_dir_0_14_1
archery integration --with-all --run-flight \
--gold-dirs=$gold_dir_0_14_1 \
--gold-dirs=$gold_dir_0_17_1 \
3 changes: 2 additions & 1 deletion cpp/src/arrow/ipc/file_to_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ Status ConvertToStream(const char* path) {

ARROW_ASSIGN_OR_RAISE(auto in_file, io::ReadableFile::Open(path));
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(in_file.get()));
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema()));
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema(),
IpcWriteOptions::Defaults()));
for (int i = 0; i < reader->num_record_batches(); ++i) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> chunk, reader->ReadRecordBatch(i));
RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
Expand Down
57 changes: 43 additions & 14 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,42 @@ MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version) {
// Arrow 0.3 to 0.7.1
return MetadataVersion::V4;
case flatbuf::MetadataVersion::V4:
// Arrow >= 0.8
// Arrow 0.8 to 0.17
return MetadataVersion::V4;
case flatbuf::MetadataVersion::V5:
// Arrow >= 1.0
return MetadataVersion::V5;
// Add cases as other versions become available
default:
return MetadataVersion::V4;
return MetadataVersion::V5;
}
}

flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version) {
switch (version) {
case MetadataVersion::V1:
return flatbuf::MetadataVersion::V1;
case MetadataVersion::V2:
return flatbuf::MetadataVersion::V2;
case MetadataVersion::V3:
return flatbuf::MetadataVersion::V3;
case MetadataVersion::V4:
return flatbuf::MetadataVersion::V4;
case MetadataVersion::V5:
return flatbuf::MetadataVersion::V5;
// Add cases as other versions become available
default:
return flatbuf::MetadataVersion::V5;
}
}

bool HasValidityBitmap(Type::type type_id, MetadataVersion version) {
// In V4, null types have no validity bitmap
// In V5 and later, null and union types have no validity bitmap
return (version < MetadataVersion::V5) ? (type_id != Type::NA)
: ::arrow::internal::HasValidityBitmap(type_id);
}

namespace {

Status IntFromFlatbuffer(const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
Expand Down Expand Up @@ -835,11 +863,11 @@ Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictio

Result<std::shared_ptr<Buffer>> WriteFBMessage(
FBB& fbb, flatbuf::MessageHeader header_type, flatbuffers::Offset<void> header,
int64_t body_length,
int64_t body_length, MetadataVersion version,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata = nullptr) {
auto message =
flatbuf::CreateMessage(fbb, kCurrentMetadataVersion, header_type, header,
body_length, SerializeCustomMetadata(fbb, custom_metadata));
auto message = flatbuf::CreateMessage(fbb, MetadataVersionToFlatbuffer(version),
header_type, header, body_length,
SerializeCustomMetadata(fbb, custom_metadata));
fbb.Finish(message);
return WriteFlatbufferBuilder(fbb);
}
Expand Down Expand Up @@ -1142,12 +1170,12 @@ Status GetKeyValueMetadata(const KVVector* fb_metadata,
}

Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<Buffer>* out) {
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
return WriteFBMessage(fbb, flatbuf::MessageHeader::Schema, fb_schema.Union(),
/*body_length=*/0)
/*body_length=*/0, options.metadata_version)
.Value(out);
}

Expand All @@ -1161,12 +1189,13 @@ Status WriteRecordBatchMessage(
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
body_length, custom_metadata)
body_length, options.metadata_version, custom_metadata)
.Value(out);
}

Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
int64_t buffer_start_offset) {
int64_t buffer_start_offset,
const IpcWriteOptions& options) {
using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>;
using TensorOffset = flatbuffers::Offset<flatbuf::Tensor>;

Expand Down Expand Up @@ -1194,18 +1223,18 @@ Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer);

return WriteFBMessage(fbb, flatbuf::MessageHeader::Tensor, fb_tensor.Union(),
body_length);
body_length, options.metadata_version);
}

Result<std::shared_ptr<Buffer>> WriteSparseTensorMessage(
const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers) {
const std::vector<BufferMetadata>& buffers, const IpcWriteOptions& options) {
FBB fbb;
SparseTensorOffset fb_sparse_tensor;
RETURN_NOT_OK(
MakeSparseTensor(fbb, sparse_tensor, body_length, buffers, &fb_sparse_tensor));
return WriteFBMessage(fbb, flatbuf::MessageHeader::SparseTensor,
fb_sparse_tensor.Union(), body_length);
fb_sparse_tensor.Union(), body_length, options.metadata_version);
}

Status WriteDictionaryMessage(
Expand All @@ -1220,7 +1249,7 @@ Status WriteDictionaryMessage(
auto dictionary_batch =
flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
body_length, custom_metadata)
body_length, options.metadata_version, custom_metadata)
.Value(out);
}

Expand Down
30 changes: 16 additions & 14 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <flatbuffers/flatbuffers.h>

#include "arrow/buffer.h"
#include "arrow/io/type_fwd.h"
#include "arrow/ipc/message.h"
#include "arrow/result.h"
#include "arrow/sparse_tensor.h"
Expand All @@ -45,16 +46,6 @@ namespace arrow {

namespace flatbuf = org::apache::arrow::flatbuf;

class DataType;
class KeyValueMetadata;
class Schema;

namespace io {

class OutputStream;

} // namespace io

namespace ipc {

class DictionaryMemo;
Expand All @@ -68,13 +59,23 @@ using KVVector = flatbuffers::Vector<KeyValueOffset>;
constexpr int32_t kIpcContinuationToken = -1;

static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion =
flatbuf::MetadataVersion::V4;
flatbuf::MetadataVersion::V5;

static constexpr flatbuf::MetadataVersion kLatestMetadataVersion =
flatbuf::MetadataVersion::V5;

static constexpr flatbuf::MetadataVersion kMinMetadataVersion =
flatbuf::MetadataVersion::V4;

MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version);

// This function is used in a unit test
ARROW_EXPORT
flatbuf::MetadataVersion MetadataVersionToFlatbuffer(MetadataVersion version);

// Whether the type has a validity bitmap in the given IPC version
bool HasValidityBitmap(Type::type type_id, MetadataVersion version);

static constexpr const char* kArrowMagicBytes = "ARROW1";

struct FieldMetadata {
Expand Down Expand Up @@ -172,7 +173,7 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size,
// \param[out] out the serialized arrow::Buffer
// \return Status outcome
Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<Buffer>* out);
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

// This function is used in a unit test
ARROW_EXPORT
Expand All @@ -183,11 +184,12 @@ Status WriteRecordBatchMessage(
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
const int64_t buffer_start_offset);
const int64_t buffer_start_offset,
const IpcWriteOptions& options);

Result<std::shared_ptr<Buffer>> WriteSparseTensorMessage(
const SparseTensor& sparse_tensor, int64_t body_length,
const std::vector<BufferMetadata>& buffers);
const std::vector<BufferMetadata>& buffers, const IpcWriteOptions& options);

Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
const std::vector<FileBlock>& record_batches,
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ struct ARROW_EXPORT IpcWriteOptions {
/// like compression
bool use_threads = true;

/// \brief Format version to use for IPC messages and their
/// metadata. Presently using V4 version (readable by v0.8.0 and later).
MetadataVersion metadata_version = MetadataVersion::V4;
/// \brief Format version to use for IPC messages and their metadata.
///
/// Presently using V5 version (readable by 1.0.0 and later).
/// V4 is also available (readable by 0.8.0 and later).
MetadataVersion metadata_version = MetadataVersion::V5;

static IpcWriteOptions Defaults();
};
Expand Down
Loading