diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 4b5a394d5ed..11b82e03b08 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -507,13 +507,11 @@ garrow_record_batch_file_reader_read_record_batch(GArrowRecordBatchFileReader *r GError **error) { auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader); - std::shared_ptr arrow_record_batch; - auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch); + auto arrow_record_batch = arrow_reader->ReadRecordBatch(i); - if (garrow_error_check(error, - status, - "[record-batch-file-reader][read-record-batch]")) { - return garrow_record_batch_new_raw(&arrow_record_batch); + if (garrow::check(error, arrow_record_batch, + "[record-batch-file-reader][read-record-batch]")) { + return garrow_record_batch_new_raw(&(*arrow_record_batch)); } else { return NULL; } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 632db2b4a91..b31a264ad58 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -75,8 +75,8 @@ class IpcScanTask : public ScanTask { return nullptr; } - std::shared_ptr batch; - RETURN_NOT_OK(reader_->ReadRecordBatch(i_++, &batch)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr batch, + reader_->ReadRecordBatch(i_++)); return projector_.Project(*batch, pool_); } diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 0346294b375..1ecddc79fe1 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -38,6 +38,7 @@ #include "arrow/ipc/writer.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/result.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/logging.h" @@ -279,7 +280,15 @@ class GrpcIpcMessageReader : public ipc::MessageReader { stream_(std::move(stream)), stream_finished_(false) {} - Status ReadNextMessage(std::unique_ptr* out) override { + ::arrow::Result> ReadNextMessage() override { + std::unique_ptr out; + RETURN_NOT_OK(GetNextMessage(&out)); + return out; + } + + protected: + Status GetNextMessage(std::unique_ptr* out) { + // TODO: Use Result APIs if (stream_finished_) { *out = nullptr; flight_reader_->last_app_metadata_ = nullptr; @@ -303,7 +312,6 @@ class GrpcIpcMessageReader : public ipc::MessageReader { return Status::OK(); } - protected: Status OverrideWithServerError(Status&& st) { // Get the gRPC status if not OK, to propagate any server error message RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish(), &rpc_->context)); @@ -484,8 +492,8 @@ Status GrpcStreamWriter::Open( std::unique_ptr result(new GrpcStreamWriter(writer)); std::unique_ptr payload_writer(new DoPutPayloadWriter( descriptor, std::move(rpc), std::move(response), read_mutex, writer, result.get())); - RETURN_NOT_OK(ipc::internal::OpenRecordBatchWriter(std::move(payload_writer), schema, - &result->batch_writer_)); + ARROW_ASSIGN_OR_RAISE(result->batch_writer_, ipc::internal::OpenRecordBatchWriter( + std::move(payload_writer), schema)); *out = std::move(result); return Status::OK(); } diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc index 3cea1c93a47..7958b421974 100644 --- a/cpp/src/arrow/flight/internal.cc +++ b/cpp/src/arrow/flight/internal.cc @@ -381,7 +381,7 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor, if (header_buf == nullptr || body_buf == nullptr) { return Status::UnknownError("Could not create buffers from protobuf"); } - return ipc::Message::Open(header_buf, body_buf, message); + return ipc::Message::Open(header_buf, body_buf).Value(message); } // FlightEndpoint @@ -466,10 +466,10 @@ Status FromProto(const pb::SchemaResult& pb_result, std::string* result) { Status SchemaToString(const Schema& schema, std::string* out) { // TODO(wesm): Do we care about better memory efficiency here? - std::shared_ptr serialized_schema; ipc::DictionaryMemo unused_dict_memo; - RETURN_NOT_OK(ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool(), - &serialized_schema)); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr serialized_schema, + ipc::SerializeSchema(schema, &unused_dict_memo, default_memory_pool())); *out = std::string(reinterpret_cast(serialized_schema->data()), static_cast(serialized_schema->size())); return Status::OK(); diff --git a/cpp/src/arrow/flight/serialization_internal.cc b/cpp/src/arrow/flight/serialization_internal.cc index 8f0a2efffbb..28ff5663f45 100644 --- a/cpp/src/arrow/flight/serialization_internal.cc +++ b/cpp/src/arrow/flight/serialization_internal.cc @@ -344,7 +344,7 @@ grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) { } Status FlightData::OpenMessage(std::unique_ptr* message) { - return ipc::Message::Open(metadata, body, message); + return ipc::Message::Open(metadata, body).Value(message); } // The pointer bitcast hack below causes legitimate warnings, silence them. diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 014921c8518..b76ddc2f107 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -99,7 +99,17 @@ class FlightIpcMessageReader : public ipc::MessageReader { std::shared_ptr* last_metadata) : reader_(reader), app_metadata_(last_metadata) {} - Status ReadNextMessage(std::unique_ptr* out) override { + const FlightDescriptor& descriptor() const { return descriptor_; } + + ::arrow::Result> ReadNextMessage() override { + std::unique_ptr out; + RETURN_NOT_OK(GetNextMessage(&out)); + return out; + } + + protected: + Status GetNextMessage(std::unique_ptr* out) { + // TODO: Migrate to Result APIs if (stream_finished_) { *out = nullptr; *app_metadata_ = nullptr; @@ -131,9 +141,6 @@ class FlightIpcMessageReader : public ipc::MessageReader { return Status::OK(); } - const FlightDescriptor& descriptor() const { return descriptor_; } - - protected: grpc::ServerReaderWriter* reader_; bool stream_finished_ = false; bool first_message_ = true; diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index bf408a15adc..7aa82bf7faf 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -718,7 +718,7 @@ class ReaderV2 : public Reader { ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options)); std::vector> batches(reader->num_record_batches()); for (int i = 0; i < reader->num_record_batches(); ++i) { - RETURN_NOT_OK(reader->ReadRecordBatch(i, &batches[i])); + ARROW_ASSIGN_OR_RAISE(batches[i], reader->ReadRecordBatch(i)); } // XXX: Handle included_fields in RecordBatchFileReader::schema diff --git a/cpp/src/arrow/ipc/file_to_stream.cc b/cpp/src/arrow/ipc/file_to_stream.cc index 292c193021c..788275cafe3 100644 --- a/cpp/src/arrow/ipc/file_to_stream.cc +++ b/cpp/src/arrow/ipc/file_to_stream.cc @@ -40,8 +40,7 @@ Status ConvertToStream(const char* path) { ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(in_file.get())); ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewStreamWriter(&sink, reader->schema())); for (int i = 0; i < reader->num_record_batches(); ++i) { - std::shared_ptr chunk; - RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, reader->ReadRecordBatch(i)); RETURN_NOT_OK(writer->WriteRecordBatch(*chunk)); } return writer->Close(); diff --git a/cpp/src/arrow/ipc/json_integration_test.cc b/cpp/src/arrow/ipc/json_integration_test.cc index fde90fddd3a..f2eea294951 100644 --- a/cpp/src/arrow/ipc/json_integration_test.cc +++ b/cpp/src/arrow/ipc/json_integration_test.cc @@ -97,8 +97,7 @@ static Status ConvertArrowToJson(const std::string& arrow_path, RETURN_NOT_OK(internal::json::JsonWriter::Open(reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { - std::shared_ptr batch; - RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr batch, reader->ReadRecordBatch(i)); RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } @@ -152,7 +151,7 @@ static Status ValidateArrowVsJson(const std::string& arrow_path, std::shared_ptr json_batch; for (int i = 0; i < json_nbatches; ++i) { RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch)); - RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch)); + ARROW_ASSIGN_OR_RAISE(arrow_batch, arrow_reader->ReadRecordBatch(i)); Status valid_st = json_batch->ValidateFull(); if (!valid_st.ok()) { return Status::Invalid("JSON record batch ", i, " did not validate:\n", diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 7280781af24..b089ab25e94 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -45,9 +45,8 @@ namespace ipc { class Message::MessageImpl { public: - explicit MessageImpl(const std::shared_ptr& metadata, - const std::shared_ptr& body) - : metadata_(metadata), message_(nullptr), body_(body) {} + explicit MessageImpl(std::shared_ptr metadata, std::shared_ptr body) + : metadata_(std::move(metadata)), message_(nullptr), body_(std::move(body)) {} Status Open() { RETURN_NOT_OK( @@ -112,15 +111,15 @@ class Message::MessageImpl { std::shared_ptr body_; }; -Message::Message(const std::shared_ptr& metadata, - const std::shared_ptr& body) { - impl_.reset(new MessageImpl(metadata, body)); +Message::Message(std::shared_ptr metadata, std::shared_ptr body) { + impl_.reset(new MessageImpl(std::move(metadata), std::move(body))); } -Status Message::Open(const std::shared_ptr& metadata, - const std::shared_ptr& body, std::unique_ptr* out) { - out->reset(new Message(metadata, body)); - return (*out)->impl_->Open(); +Result> Message::Open(std::shared_ptr metadata, + std::shared_ptr body) { + std::unique_ptr result(new Message(std::move(metadata), std::move(body))); + RETURN_NOT_OK(result->impl_->Open()); + return std::move(result); } Message::~Message() {} @@ -182,8 +181,8 @@ Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_lengt return Status::OK(); } -Status Message::ReadFrom(std::shared_ptr metadata, io::InputStream* stream, - std::unique_ptr* out) { +Result> Message::ReadFrom(std::shared_ptr metadata, + io::InputStream* stream) { RETURN_NOT_OK(MaybeAlignMetadata(&metadata)); int64_t body_length = -1; RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); @@ -194,11 +193,12 @@ Status Message::ReadFrom(std::shared_ptr metadata, io::InputStream* stre " bytes for message body, got ", body->size()); } - return Message::Open(metadata, body, out); + return Message::Open(metadata, body); } -Status Message::ReadFrom(const int64_t offset, std::shared_ptr metadata, - io::RandomAccessFile* file, std::unique_ptr* out) { +Result> Message::ReadFrom(const int64_t offset, + std::shared_ptr metadata, + io::RandomAccessFile* file) { RETURN_NOT_OK(MaybeAlignMetadata(&metadata)); int64_t body_length = -1; RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); @@ -209,7 +209,7 @@ Status Message::ReadFrom(const int64_t offset, std::shared_ptr metadata, " bytes for message body, got ", body->size()); } - return Message::Open(metadata, body, out); + return Message::Open(metadata, body); } Status WritePadding(io::OutputStream* stream, int64_t nbytes) { @@ -261,8 +261,8 @@ std::string FormatMessageType(Message::Type type) { return "unknown"; } -Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, - std::unique_ptr* message) { +Result> ReadMessage(int64_t offset, int32_t metadata_length, + io::RandomAccessFile* file) { if (static_cast(metadata_length) < sizeof(int32_t)) { return Status::Invalid("metadata_length should be at least 4"); } @@ -309,7 +309,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile std::shared_ptr metadata = SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size); - return Message::ReadFrom(offset + metadata_length, metadata, file, message); + return Message::ReadFrom(offset + metadata_length, metadata, file); } Status AlignStream(io::InputStream* stream, int32_t alignment) { @@ -336,9 +336,7 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) { } } -namespace { - -Result> DoReadMessage(io::InputStream* file, MemoryPool* pool) { +Result> ReadMessage(io::InputStream* file, MemoryPool* pool) { int32_t continuation = 0; ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, file->Read(sizeof(int32_t), &continuation)); @@ -374,19 +372,7 @@ Result> DoReadMessage(io::InputStream* file, MemoryPool ARROW_ASSIGN_OR_RAISE(metadata, Buffer::ViewOrCopy(metadata, CPUDevice::memory_manager(pool))); - std::unique_ptr message; - RETURN_NOT_OK(Message::ReadFrom(metadata, file, &message)); - return std::move(message); -} - -} // namespace - -Status ReadMessage(io::InputStream* file, std::unique_ptr* out) { - return DoReadMessage(file, default_memory_pool()).Value(out); -} - -Result> ReadMessage(io::InputStream* file, MemoryPool* pool) { - return DoReadMessage(file, pool); + return Message::ReadFrom(metadata, file); } Status WriteMessage(const Buffer& message, const IpcWriteOptions& options, @@ -436,9 +422,7 @@ class InputStreamMessageReader : public MessageReader { ~InputStreamMessageReader() {} - Status ReadNextMessage(std::unique_ptr* message) { - return ReadMessage(stream_, message); - } + Result> ReadNextMessage() { return ReadMessage(stream_); } private: io::InputStream* stream_; @@ -454,5 +438,17 @@ std::unique_ptr MessageReader::Open( return std::unique_ptr(new InputStreamMessageReader(owned_stream)); } +// ---------------------------------------------------------------------- +// Deprecated functions + +Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, + std::unique_ptr* message) { + return ReadMessage(offset, metadata_length, file).Value(message); +} + +Status ReadMessage(io::InputStream* file, std::unique_ptr* out) { + return ReadMessage(file, default_memory_pool()).Value(out); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 81cd58bfe91..9698806fd21 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "arrow/result.h" #include "arrow/status.h" @@ -71,7 +72,7 @@ class ARROW_EXPORT Message { /// \brief Construct message, but do not validate /// /// Use at your own risk; Message::Open has more metadata validation - Message(const std::shared_ptr& metadata, const std::shared_ptr& body); + Message(std::shared_ptr metadata, std::shared_ptr body); ~Message(); @@ -79,32 +80,48 @@ class ARROW_EXPORT Message { /// /// \param[in] metadata a buffer containing the Flatbuffer metadata /// \param[in] body a buffer containing the message body, which may be null - /// \param[out] out the created message - /// \return Status + /// \return the created message + static Result> Open(std::shared_ptr metadata, + std::shared_ptr body); + + ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") static Status Open(const std::shared_ptr& metadata, - const std::shared_ptr& body, std::unique_ptr* out); + const std::shared_ptr& body, std::unique_ptr* out) { + return Open(metadata, body).Value(out); + } /// \brief Read message body and create Message given Flatbuffer metadata /// \param[in] metadata containing a serialized Message flatbuffer /// \param[in] stream an InputStream - /// \param[out] out the created Message - /// \return Status + /// \return the created Message /// /// \note If stream supports zero-copy, this is zero-copy + static Result> ReadFrom(std::shared_ptr metadata, + io::InputStream* stream); + + ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") static Status ReadFrom(std::shared_ptr metadata, io::InputStream* stream, - std::unique_ptr* out); + std::unique_ptr* out) { + return ReadFrom(std::move(metadata), stream).Value(out); + } /// \brief Read message body from position in file, and create Message given /// the Flatbuffer metadata /// \param[in] offset the position in the file where the message body starts. /// \param[in] metadata containing a serialized Message flatbuffer /// \param[in] file the seekable file interface to read from - /// \param[out] out the created Message - /// \return Status + /// \return the created Message /// /// \note If file supports zero-copy, this is zero-copy + static Result> ReadFrom(const int64_t offset, + std::shared_ptr metadata, + io::RandomAccessFile* file); + + ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") static Status ReadFrom(const int64_t offset, std::shared_ptr metadata, - io::RandomAccessFile* file, std::unique_ptr* out); + io::RandomAccessFile* file, std::unique_ptr* out) { + return ReadFrom(offset, std::move(metadata), file).Value(out); + } /// \brief Return true if message type and contents are equal /// @@ -178,9 +195,13 @@ class ARROW_EXPORT MessageReader { /// \brief Read next Message from the interface /// - /// \param[out] message an arrow::ipc::Message instance - /// \return Status - virtual Status ReadNextMessage(std::unique_ptr* message) = 0; + /// \return an arrow::ipc::Message instance + virtual Result> ReadNextMessage() = 0; + + ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") + Status ReadNextMessage(std::unique_ptr* message) { + return ReadNextMessage().Value(message); + } }; /// \brief Read encapsulated RPC message from position in file @@ -195,11 +216,11 @@ class ARROW_EXPORT MessageReader { /// first 4 bytes after the offset are the message length /// \param[in] metadata_length the total number of bytes to read from file /// \param[in] file the seekable file interface to read from -/// \param[out] message the message read -/// \return Status success or failure +/// \return the message read ARROW_EXPORT -Status ReadMessage(const int64_t offset, const int32_t metadata_length, - io::RandomAccessFile* file, std::unique_ptr* message); +Result> ReadMessage(const int64_t offset, + const int32_t metadata_length, + io::RandomAccessFile* file); /// \brief Advance stream to an 8-byte offset if its position is not a multiple /// of 8 already @@ -224,13 +245,6 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8); ARROW_EXPORT Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8); -/// \brief Read encapsulated IPC message (metadata and body) from InputStream -/// -/// Returns null if there are not enough bytes available or the -/// message length is 0 (e.g. EOS in a stream) -ARROW_EXPORT -Status ReadMessage(io::InputStream* stream, std::unique_ptr* message); - /// \brief Read encapsulated IPC message (metadata and body) from InputStream /// /// Returns null if there are not enough bytes available or the @@ -252,6 +266,7 @@ Result> ReadMessage(io::InputStream* stream, /// message: const void* /// padding /// +/// /// \param[in] message a buffer containing the metadata to write /// \param[in] options IPC writing options, including alignment and /// legacy message support @@ -262,5 +277,17 @@ Result> ReadMessage(io::InputStream* stream, Status WriteMessage(const Buffer& message, const IpcWriteOptions& options, io::OutputStream* file, int32_t* message_length); +// ---------------------------------------------------------------------- +// Deprecated APIs + +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status ReadMessage(const int64_t offset, const int32_t metadata_length, + io::RandomAccessFile* file, std::unique_ptr* message); + +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status ReadMessage(io::InputStream* stream, std::unique_ptr* message); + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 94d1d8e59ed..f9500be40af 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -100,8 +100,8 @@ TEST(TestMessage, SerializeTo) { std::string body = "abcdef"; - std::unique_ptr message; - ASSERT_OK(Message::Open(metadata, std::make_shared(body), &message)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + Message::Open(metadata, std::make_shared(body))); auto CheckWithAlignment = [&](int32_t alignment) { IpcWriteOptions options; @@ -125,11 +125,11 @@ TEST(TestMessage, SerializeCustomMetadata) { key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})}; for (auto metadata : cases) { std::shared_ptr serialized; - std::unique_ptr message; ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata, /*nodes=*/{}, /*buffers=*/{}, &serialized)); - ASSERT_OK(Message::Open(serialized, /*body=*/nullptr, &message)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + Message::Open(serialized, /*body=*/nullptr)); if (metadata) { ASSERT_TRUE(message->custom_metadata()->Equals(*metadata)); @@ -163,7 +163,7 @@ TEST(TestMessage, LegacyIpcBackwardsCompatibility) { ASSERT_OK_AND_ASSIGN(*out_serialized, stream->Finish()); io::BufferReader io_reader(*out_serialized); - ASSERT_OK(ReadMessage(&io_reader, out)); + ASSERT_OK(ReadMessage(&io_reader).Value(out)); }; std::shared_ptr serialized, legacy_serialized; @@ -199,9 +199,9 @@ class TestSchemaMetadata : public ::testing::Test { void SetUp() {} void CheckRoundtrip(const Schema& schema) { - std::shared_ptr buffer; DictionaryMemo in_memo, out_memo; - ASSERT_OK(SerializeSchema(schema, &out_memo, default_memory_pool(), &buffer)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr buffer, + SerializeSchema(schema, &out_memo, default_memory_pool())); io::BufferReader reader(buffer); ASSERT_OK_AND_ASSIGN(auto actual_schema, ReadSchema(&reader, &in_memo)); @@ -285,9 +285,8 @@ class IpcTestFixture : public io::MemoryMapFixture { void DoSchemaRoundTrip(const Schema& schema, DictionaryMemo* out_memo, std::shared_ptr* result) { - std::shared_ptr serialized_schema; - ASSERT_OK( - SerializeSchema(schema, out_memo, options_.memory_pool, &serialized_schema)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr serialized_schema, + SerializeSchema(schema, out_memo, options_.memory_pool)); DictionaryMemo in_memo; io::BufferReader buf_reader(serialized_schema); @@ -299,8 +298,8 @@ class IpcTestFixture : public io::MemoryMapFixture { const RecordBatch& batch, const IpcWriteOptions& options, DictionaryMemo* dictionary_memo, const IpcReadOptions& read_options = IpcReadOptions::Defaults()) { - std::shared_ptr serialized_batch; - RETURN_NOT_OK(SerializeRecordBatch(batch, options, &serialized_batch)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr serialized_batch, + SerializeRecordBatch(batch, options)); io::BufferReader buf_reader(serialized_batch); return ReadRecordBatch(batch.schema(), dictionary_memo, read_options, &buf_reader); @@ -326,9 +325,7 @@ class IpcTestFixture : public io::MemoryMapFixture { std::shared_ptr file_reader; ARROW_ASSIGN_OR_RAISE(file_reader, RecordBatchFileReader::Open(mmap_.get(), offset)); - std::shared_ptr result; - RETURN_NOT_OK(file_reader->ReadRecordBatch(0, &result)); - return result; + return file_reader->ReadRecordBatch(0); } void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) { @@ -417,8 +414,8 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) { ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, options_)); - std::unique_ptr message; - ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ReadMessage(0, metadata_length, mmap_.get())); ASSERT_EQ(MetadataVersion::V4, message->metadata_version()); } @@ -426,12 +423,11 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) { TEST(TestReadMessage, CorruptedSmallInput) { std::string data = "abc"; io::BufferReader reader(data); - std::unique_ptr message; - ASSERT_RAISES(Invalid, ReadMessage(&reader, &message)); + ASSERT_RAISES(Invalid, ReadMessage(&reader)); // But no error on unsignaled EOS io::BufferReader reader2(""); - ASSERT_OK(ReadMessage(&reader2, &message)); + ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(&reader2)); ASSERT_EQ(nullptr, message); } @@ -525,8 +521,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) { } IpcWriteOptions options = IpcWriteOptions::Defaults(); options.compression = codec; - std::shared_ptr buf; - ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options, &buf)); + ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options)); } } @@ -741,8 +736,8 @@ TEST_F(RecursionLimits, ReadLimit) { ASSERT_OK(WriteToMmap(recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); - std::unique_ptr message; - ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ReadMessage(0, metadata_length, mmap_.get())); io::BufferReader reader(message->body()); @@ -762,8 +757,8 @@ TEST_F(RecursionLimits, StressLimit) { ASSERT_OK(WriteToMmap(recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); - std::unique_ptr message; - ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr message, + ReadMessage(0, metadata_length, mmap_.get())); DictionaryMemo empty_memo; @@ -820,8 +815,8 @@ struct FileWriterHelper { EXPECT_EQ(num_batches_written_, reader->num_record_batches()); for (int i = 0; i < num_batches_written_; ++i) { - std::shared_ptr chunk; - RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, + reader->ReadRecordBatch(i)); out_batches->push_back(chunk); } @@ -1198,7 +1193,7 @@ void SpliceMessages(std::shared_ptr stream, // Parse and reassemble first two messages in stream int message_index = 0; while (true) { - ASSERT_OK(message_reader->ReadNextMessage(&msg)); + ASSERT_OK_AND_ASSIGN(msg, message_reader->ReadNextMessage()); if (!msg) { break; } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8411a67dfad..484d648bbd0 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -470,7 +470,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) { static Status ReadContiguousPayload(io::InputStream* file, std::unique_ptr* message) { - RETURN_NOT_OK(ReadMessage(file, message)); + ARROW_ASSIGN_OR_RAISE(*message, ReadMessage(file)); if (*message == nullptr) { return Status::Invalid("Unable to read metadata at offset"); } @@ -591,8 +591,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { options_ = options; // Read schema - std::unique_ptr message; - RETURN_NOT_OK(message_reader_->ReadNextMessage(&message)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, + message_reader_->ReadNextMessage()); if (!message) { return Status::Invalid("Tried reading schema message, was null or length 0"); } @@ -622,8 +622,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { return Status::OK(); } - std::unique_ptr message; - RETURN_NOT_OK(message_reader_->ReadNextMessage(&message)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, + message_reader_->ReadNextMessage()); if (message == nullptr) { // End of stream *batch = nullptr; @@ -661,7 +661,7 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader { // TODO(wesm): In future, we may want to reconcile the ids in the stream with // those found in the schema for (int i = 0; i < dictionary_memo_.num_fields(); ++i) { - RETURN_NOT_OK(message_reader_->ReadNextMessage(&message)); + ARROW_ASSIGN_OR_RAISE(message, message_reader_->ReadNextMessage()); if (!message) { if (i == 0) { /// ARROW-6006: If we fail to find any dictionaries in the stream, then @@ -745,7 +745,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return internal::GetMetadataVersion(footer_->version()); } - Status ReadRecordBatch(int i, std::shared_ptr* batch) override { + Result> ReadRecordBatch(int i) override { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); @@ -760,8 +760,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { CHECK_HAS_BODY(*message); ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body())); return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &dictionary_memo_, - options_, reader.get()) - .Value(batch); + options_, reader.get()); } Status Open(const std::shared_ptr& file, int64_t footer_offset, @@ -809,11 +808,10 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::Invalid("Unaligned block in IPC file"); } - RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, out)); - // TODO(wesm): this breaks integration tests, see ARROW-3256 // DCHECK_EQ((*out)->body_length(), block.body_length); - return Status::OK(); + + return ReadMessage(block.offset, block.metadata_length, file_).Value(out); } Status ReadDictionaries() { @@ -932,8 +930,7 @@ Result> RecordBatchFileReader::Open( Result> ReadSchema(io::InputStream* stream, DictionaryMemo* dictionary_memo) { std::unique_ptr reader = MessageReader::Open(stream); - std::unique_ptr message; - RETURN_NOT_OK(reader->ReadNextMessage(&message)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, reader->ReadNextMessage()); if (!message) { return Status::Invalid("Tried reading schema message, was null or length 0"); } @@ -1387,8 +1384,7 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) { const int n_batches = batch_reader->num_record_batches(); for (int i = 0; i < n_batches; ++i) { - std::shared_ptr batch; - RETURN_NOT_OK(batch_reader->ReadRecordBatch(i, &batch)); + ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i)); RETURN_NOT_OK(batch->ValidateFull()); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 903b9525c84..b691e0df122 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -185,9 +185,13 @@ class ARROW_EXPORT RecordBatchFileReader { /// if the input source supports zero-copy. /// /// \param[in] i the index of the record batch to return - /// \param[out] batch the read batch - /// \return Status - virtual Status ReadRecordBatch(int i, std::shared_ptr* batch) = 0; + /// \return the read batch + virtual Result> ReadRecordBatch(int i) = 0; + + ARROW_DEPRECATED("Use version with Result return value") + Status ReadRecordBatch(int i, std::shared_ptr* batch) { + return ReadRecordBatch(i).Value(batch); + } }; // Generic read functions; does not copy data if the input supports zero copy reads diff --git a/cpp/src/arrow/ipc/stream_to_file.cc b/cpp/src/arrow/ipc/stream_to_file.cc index 126195aae9b..c2cec48babe 100644 --- a/cpp/src/arrow/ipc/stream_to_file.cc +++ b/cpp/src/arrow/ipc/stream_to_file.cc @@ -40,7 +40,7 @@ Status ConvertToFile() { ARROW_ASSIGN_OR_RAISE(auto writer, NewFileWriter(&sink, reader->schema())); std::shared_ptr batch; while (true) { - RETURN_NOT_OK(reader->ReadNext(&batch)); + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); if (batch == nullptr) break; RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 43b292d3d09..6579d47f8b7 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -718,8 +718,8 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat return Status::OK(); } -Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, - std::unique_ptr* out) { +Result> GetTensorMessage(const Tensor& tensor, + MemoryPool* pool) { const Tensor* tensor_to_write = &tensor; std::unique_ptr temp_tensor; @@ -730,8 +730,7 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, std::shared_ptr metadata; ARROW_ASSIGN_OR_RAISE(metadata, internal::WriteTensorMessage(*tensor_to_write, 0)); - out->reset(new Message(metadata, tensor_to_write->data())); - return Status::OK(); + return std::unique_ptr(new Message(metadata, tensor_to_write->data())); } namespace internal { @@ -859,16 +858,12 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds metadata_length); } -Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, - std::unique_ptr* out) { +Result> GetSparseTensorMessage(const SparseTensor& sparse_tensor, + MemoryPool* pool) { internal::IpcPayload payload; RETURN_NOT_OK(internal::GetSparseTensorPayload(sparse_tensor, pool, &payload)); - - const std::shared_ptr metadata = payload.metadata; - const std::shared_ptr buffer = *payload.body_buffers.data(); - - out->reset(new Message(metadata, buffer)); - return Status::OK(); + return std::unique_ptr( + new Message(std::move(payload.metadata), std::move(payload.body_buffers[0]))); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { @@ -1188,14 +1183,6 @@ Result> NewFileWriter( namespace internal { -Status OpenRecordBatchWriter(std::unique_ptr sink, - const std::shared_ptr& schema, - std::unique_ptr* out) { - auto options = IpcWriteOptions::Defaults(); - ASSIGN_OR_RAISE(*out, OpenRecordBatchWriter(std::move(sink), schema, options)); - return Status::OK(); -} - Result> OpenRecordBatchWriter( std::unique_ptr sink, const std::shared_ptr& schema, const IpcWriteOptions& options) { @@ -1227,8 +1214,8 @@ Result> SerializeRecordBatch(const RecordBatch& batch, return buffer; } -Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, - std::shared_ptr* out) { +Result> SerializeRecordBatch(const RecordBatch& batch, + const IpcWriteOptions& options) { int64_t size = 0; RETURN_NOT_OK(GetRecordBatchSize(batch, &size)); ARROW_ASSIGN_OR_RAISE(std::shared_ptr buffer, @@ -1236,8 +1223,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& opt io::FixedSizeBufferWriter stream(buffer); RETURN_NOT_OK(SerializeRecordBatch(batch, options, &stream)); - *out = std::move(buffer); - return Status::OK(); + return buffer; } Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, @@ -1247,8 +1233,9 @@ Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& opt return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, options); } -Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, - MemoryPool* pool, std::shared_ptr* out) { +Result> SerializeSchema(const Schema& schema, + DictionaryMemo* dictionary_memo, + MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE(auto stream, io::BufferOutputStream::Create(1024, pool)); auto options = IpcWriteOptions::Defaults(); @@ -1257,7 +1244,7 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, options, dictionary_memo); // Write schema and populate fields (but not dictionaries) in dictionary_memo RETURN_NOT_OK(writer.Start()); - return stream->Finish().Value(out); + return stream->Finish(); } // ---------------------------------------------------------------------- @@ -1299,11 +1286,16 @@ Result> RecordBatchFileWriter::Open( return NewFileWriter(sink, schema, options); } +Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, + std::shared_ptr* out) { + return SerializeRecordBatch(batch, options).Value(out); +} + Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out) { IpcWriteOptions options; options.memory_pool = pool; - return SerializeRecordBatch(batch, options, out); + return SerializeRecordBatch(batch, options).Value(out); } Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, @@ -1313,6 +1305,11 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, return SerializeRecordBatch(batch, options, out); } +Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, + MemoryPool* pool, std::shared_ptr* out) { + return SerializeSchema(schema, dictionary_memo, pool).Value(out); +} + Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, const IpcWriteOptions& options, @@ -1323,8 +1320,24 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, modified_options); } +Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, + std::unique_ptr* out) { + return GetTensorMessage(tensor, pool).Value(out); +} + +Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, + std::unique_ptr* out) { + return GetSparseTensorMessage(sparse_tensor, pool).Value(out); +} + namespace internal { +Status OpenRecordBatchWriter(std::unique_ptr sink, + const std::shared_ptr& schema, + std::unique_ptr* out) { + return OpenRecordBatchWriter(std::move(sink), schema).Value(out); +} + Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options, MemoryPool* pool, IpcPayload* out) { IpcWriteOptions modified_options = options; diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index f2b057e258d..da8c7ed7e21 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -79,7 +79,7 @@ class ARROW_EXPORT RecordBatchWriter { /// \return Status virtual Status Close() = 0; - ARROW_DEPRECATED("No-op. Pass MemoryPool using IpcWriteOptions") + ARROW_DEPRECATED("Deprecated in 0.17.0. No-op. Pass MemoryPool using IpcWriteOptions") void set_memory_pool(MemoryPool* pool) {} }; @@ -130,11 +130,10 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, /// /// \param[in] batch the record batch /// \param[in] options the IpcWriteOptions to use for serialization -/// \param[out] out the serialized message -/// \return Status +/// \return the serialized message ARROW_EXPORT -Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, - std::shared_ptr* out); +Result> SerializeRecordBatch(const RecordBatch& batch, + const IpcWriteOptions& options); /// \brief Serialize record batch as encapsulated IPC message in a new buffer /// @@ -163,11 +162,11 @@ Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& opt /// \param[in] schema the schema to write /// \param[in] dictionary_memo a DictionaryMemo for recording dictionary ids /// \param[in] pool a MemoryPool to allocate memory from -/// \param[out] out the serialized schema -/// \return Status +/// \return the serialized schema ARROW_EXPORT -Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, - MemoryPool* pool, std::shared_ptr* out); +Result> SerializeSchema(const Schema& schema, + DictionaryMemo* dictionary_memo, + MemoryPool* pool = default_memory_pool()); /// \brief Write multiple record batches to OutputStream, including schema /// \param[in] batches a vector of batches. Must all have same schema @@ -199,11 +198,9 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size); /// /// \param[in] tensor the Tensor to write /// \param[in] pool MemoryPool to allocate space for metadata -/// \param[out] out the resulting Message -/// \return Status +/// \return the resulting Message ARROW_EXPORT -Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, - std::unique_ptr* out); +Result> GetTensorMessage(const Tensor& tensor, MemoryPool* pool); /// \brief Write arrow::Tensor as a contiguous message. /// @@ -235,11 +232,10 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat /// /// \param[in] sparse_tensor the SparseTensor to write /// \param[in] pool MemoryPool to allocate space for metadata -/// \param[out] out the resulting Message -/// \return Status +/// \return the resulting Message ARROW_EXPORT -Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, - std::unique_ptr* out); +Result> GetSparseTensorMessage(const SparseTensor& sparse_tensor, + MemoryPool* pool); /// \brief EXPERIMENTAL: Write arrow::SparseTensor as a contiguous message. The metadata, /// sparse index, and body are written assuming 64-byte alignment. It is the @@ -286,10 +282,6 @@ class ARROW_EXPORT IpcPayloadWriter { /// \param[in] schema the schema of the record batches to be written /// \param[out] out the created RecordBatchWriter /// \return Status -ARROW_EXPORT -Status OpenRecordBatchWriter(std::unique_ptr sink, - const std::shared_ptr& schema, - std::unique_ptr* out); /// Create a new RecordBatchWriter from IpcPayloadWriter and schema. /// @@ -300,7 +292,7 @@ Status OpenRecordBatchWriter(std::unique_ptr sink, ARROW_EXPORT Result> OpenRecordBatchWriter( std::unique_ptr sink, const std::shared_ptr& schema, - const IpcWriteOptions& options); + const IpcWriteOptions& options = IpcWriteOptions::Defaults()); /// \brief Compute IpcPayload for the given schema /// \param[in] schema the Schema that is being serialized @@ -360,7 +352,7 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { /// \param[in] schema the schema of the record batches to be written /// \param[out] out the created stream writer /// \return Status - ARROW_DEPRECATED("Use arrow::ipc::NewStreamWriter()") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewStreamWriter()") static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, std::shared_ptr* out); @@ -370,11 +362,11 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { /// \param[in] sink output stream to write to /// \param[in] schema the schema of the record batches to be written /// \return Result> - ARROW_DEPRECATED("Use arrow::ipc::NewStreamWriter()") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewStreamWriter()") static Result> Open( io::OutputStream* sink, const std::shared_ptr& schema); - ARROW_DEPRECATED("Use arrow::ipc::NewStreamWriter()") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewStreamWriter()") static Result> Open( io::OutputStream* sink, const std::shared_ptr& schema, const IpcWriteOptions& options); @@ -393,7 +385,7 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param[in] schema the schema of the record batches to be written /// \param[out] out the created stream writer /// \return Status - ARROW_DEPRECATED("Use arrow::ipc::NewFileWriter") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewFileWriter") static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, std::shared_ptr* out); @@ -402,18 +394,18 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param[in] sink output stream to write to /// \param[in] schema the schema of the record batches to be written /// \return Result> - ARROW_DEPRECATED("Use arrow::ipc::NewFileWriter") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewFileWriter") static Result> Open( io::OutputStream* sink, const std::shared_ptr& schema); - ARROW_DEPRECATED("Use arrow::ipc::NewFileWriter") + ARROW_DEPRECATED("Deprecated in 0.17.0. Use arrow::ipc::NewFileWriter") static Result> Open( io::OutputStream* sink, const std::shared_ptr& schema, const IpcWriteOptions& options); }; ARROW_DEPRECATED( - "Use version without MemoryPool argument " + "Deprecated in 0.17.0. Use version without MemoryPool argument " "(use IpcWriteOptions to pass MemoryPool") ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, @@ -421,25 +413,55 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, int64_t* body_length, const IpcWriteOptions& options, MemoryPool* pool); -ARROW_DEPRECATED("Use version with IpcWriteOptions") +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, const IpcWriteOptions& options, + std::shared_ptr* out); + +ARROW_DEPRECATED( + "Deprecated in 0.17.0. Use Result-returning version with " + "IpcWriteOptions") ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out); -ARROW_DEPRECATED("Use version with IpcWriteOptions") +ARROW_DEPRECATED( + "Deprecated in 0.17.0. Use Result-returning version with " + "IpcWriteOptions") ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, io::OutputStream* out); +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, + MemoryPool* pool, std::shared_ptr* out); + +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, + std::unique_ptr* out); + +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status GetSparseTensorMessage(const SparseTensor& sparse_tensor, MemoryPool* pool, + std::unique_ptr* out); + namespace internal { -ARROW_DEPRECATED("Pass MemoryPool with IpcWriteOptions") +ARROW_DEPRECATED("Deprecated in 0.17.0. Use Result-returning version") +ARROW_EXPORT +Status OpenRecordBatchWriter(std::unique_ptr sink, + const std::shared_ptr& schema, + std::unique_ptr* out); + +ARROW_DEPRECATED("Deprecated in 0.17.0. Pass MemoryPool with IpcWriteOptions") ARROW_EXPORT Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary, const IpcWriteOptions& options, MemoryPool* pool, IpcPayload* payload); -ARROW_DEPRECATED("Pass MemoryPool with IpcWriteOptions") +ARROW_DEPRECATED("Deprecated in 0.17.0. Pass MemoryPool with IpcWriteOptions") ARROW_EXPORT Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options, MemoryPool* pool, IpcPayload* out); diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index e8ebfdc4688..ac900aabc1a 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -739,8 +739,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out // For each tensor, get a metadata buffer and a buffer for the body for (const auto& tensor : this->tensors) { - std::unique_ptr message; - RETURN_NOT_OK(ipc::GetTensorMessage(*tensor, memory_pool, &message)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, + ipc::GetTensorMessage(*tensor, memory_pool)); RETURN_NOT_OK(PushBuffer(message->metadata())); RETURN_NOT_OK(PushBuffer(message->body())); } @@ -758,8 +758,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out // For each ndarray, get a metadata buffer and a buffer for the body for (const auto& ndarray : this->ndarrays) { - std::unique_ptr message; - RETURN_NOT_OK(ipc::GetTensorMessage(*ndarray, memory_pool, &message)); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr message, + ipc::GetTensorMessage(*ndarray, memory_pool)); RETURN_NOT_OK(PushBuffer(message->metadata())); RETURN_NOT_OK(PushBuffer(message->body())); } diff --git a/cpp/src/jni/orc/jni_wrapper.cpp b/cpp/src/jni/orc/jni_wrapper.cpp index 2a9ba74ce9d..ce0546732c1 100644 --- a/cpp/src/jni/orc/jni_wrapper.cpp +++ b/cpp/src/jni/orc/jni_wrapper.cpp @@ -226,16 +226,15 @@ Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_getSchema(JNIEnv* en auto schema = stripe_reader->schema(); - std::shared_ptr out; - auto status = - arrow::ipc::SerializeSchema(*schema, nullptr, arrow::default_memory_pool(), &out); - if (!status.ok()) { + auto maybe_buffer = arrow::ipc::SerializeSchema(*schema, nullptr); + if (!maybe_buffer.ok()) { return nullptr; } + auto buffer = *std::move(maybe_buffer); - jbyteArray ret = env->NewByteArray(out->size()); - auto src = reinterpret_cast(out->data()); - env->SetByteArrayRegion(ret, 0, out->size(), src); + jbyteArray ret = env->NewByteArray(buffer->size()); + auto src = reinterpret_cast(buffer->data()); + env->SetByteArrayRegion(ret, 0, buffer->size(), src); return ret; } diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 65f83f941eb..011a583466b 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -763,8 +763,8 @@ Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* poo } ::arrow::ipc::DictionaryMemo dict_memo; - std::shared_ptr serialized; - RETURN_NOT_OK(::arrow::ipc::SerializeSchema(schema, &dict_memo, pool, &serialized)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr serialized, + ::arrow::ipc::SerializeSchema(schema, &dict_memo, pool)); // The serialized schema is not UTF-8, which is required for Thrift std::string schema_as_string = serialized->ToString(); diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index a7fe4cff2d8..0ae6bf2b650 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1225,9 +1225,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: int64_t body_length cdef cppclass CMessage" arrow::ipc::Message": - CStatus Open(const shared_ptr[CBuffer]& metadata, - const shared_ptr[CBuffer]& body, - unique_ptr[CMessage]* out) + CResult[unique_ptr[CMessage]] Open(shared_ptr[CBuffer] metadata, + shared_ptr[CBuffer] body) shared_ptr[CBuffer] body() @@ -1247,7 +1246,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: @staticmethod unique_ptr[CMessageReader] Open(const shared_ptr[CInputStream]& stream) - CStatus ReadNextMessage(unique_ptr[CMessage]* out) + CResult[unique_ptr[CMessage]] ReadNextMessage() cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter": CStatus Close() @@ -1289,7 +1288,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: int num_record_batches() - CStatus ReadRecordBatch(int i, shared_ptr[CRecordBatch]* batch) + CResult[shared_ptr[CRecordBatch]] ReadRecordBatch(int i) CResult[unique_ptr[CMessage]] ReadMessage(CInputStream* stream, CMemoryPool* pool) @@ -1308,13 +1307,12 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CDictionaryMemo* dictionary_memo, const CIpcReadOptions& options) - CStatus SerializeSchema(const CSchema& schema, - CDictionaryMemo* dictionary_memo, - CMemoryPool* pool, shared_ptr[CBuffer]* out) + CResult[shared_ptr[CBuffer]] SerializeSchema( + const CSchema& schema, CDictionaryMemo* dictionary_memo, + CMemoryPool* pool) - CStatus SerializeRecordBatch(const CRecordBatch& schema, - const CIpcWriteOptions& options, - shared_ptr[CBuffer]* out) + CResult[shared_ptr[CBuffer]] SerializeRecordBatch( + const CRecordBatch& schema, const CIpcWriteOptions& options) CResult[shared_ptr[CSchema]] ReadSchema(CInputStream* stream, CDictionaryMemo* dictionary_memo) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index c21a8779292..2cb74f0e09b 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -164,7 +164,8 @@ cdef class MessageReader: cdef Message result = Message.__new__(Message) with nogil: - check_status(self.reader.get().ReadNextMessage(&result.message)) + result.message = move(GetResultValue(self.reader.get() + .ReadNextMessage())) if result.message.get() == NULL: raise StopIteration @@ -420,7 +421,7 @@ cdef class _RecordBatchFileReader: raise ValueError('Batch number {0} out of range'.format(i)) with nogil: - check_status(self.reader.get().ReadRecordBatch(i, &batch)) + batch = GetResultValue(self.reader.get().ReadRecordBatch(i)) return pyarrow_wrap_batch(batch) @@ -442,7 +443,8 @@ cdef class _RecordBatchFileReader: batches.resize(nbatches) with nogil: for i in range(nbatches): - check_status(self.reader.get().ReadRecordBatch(i, &batches[i])) + batches[i] = GetResultValue(self.reader.get() + .ReadRecordBatch(i)) table = GetResultValue( CTable.FromRecordBatches(self.schema.sp_schema, move(batches))) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index e29e94ab56a..42a6cffbb3b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -765,8 +765,8 @@ cdef class RecordBatch(_PandasConvertible): options.memory_pool = maybe_unbox_memory_pool(memory_pool) with nogil: - check_status(SerializeRecordBatch(deref(self.batch), - options, &buffer)) + buffer = GetResultValue( + SerializeRecordBatch(deref(self.batch), options)) return pyarrow_wrap_buffer(buffer) def slice(self, offset=0, length=None): diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 4650bf084f7..6609a81581f 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1531,8 +1531,8 @@ cdef class Schema: arg_dict_memo = &temp_memo with nogil: - check_status(SerializeSchema(deref(self.schema), arg_dict_memo, - pool, &buffer)) + buffer = GetResultValue(SerializeSchema(deref(self.schema), + arg_dict_memo, pool)) return pyarrow_wrap_buffer(buffer) def remove_metadata(self): diff --git a/r/src/message.cpp b/r/src/message.cpp index 440bb5b58b9..4d7115f5d88 100644 --- a/r/src/message.cpp +++ b/r/src/message.cpp @@ -89,17 +89,13 @@ std::unique_ptr ipc___MessageReader__Open( // [[arrow::export]] std::unique_ptr ipc___MessageReader__ReadNextMessage( const std::unique_ptr& reader) { - std::unique_ptr message; - STOP_IF_NOT_OK(reader->ReadNextMessage(&message)); - return message; + return VALUE_OR_STOP(reader->ReadNextMessage()); } // [[arrow::export]] std::unique_ptr ipc___ReadMessage( const std::shared_ptr& stream) { - std::unique_ptr message; - STOP_IF_NOT_OK(arrow::ipc::ReadMessage(stream.get(), &message)); - return message; + return VALUE_OR_STOP(arrow::ipc::ReadMessage(stream.get())); } #endif diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index d29f59cd711..3fce4d094fb 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -75,11 +75,10 @@ int ipc___RecordBatchFileReader__num_record_batches( // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileReader__ReadRecordBatch( const std::shared_ptr& reader, int i) { - std::shared_ptr batch; - if (i >= 0 && i < reader->num_record_batches()) { - STOP_IF_NOT_OK(reader->ReadRecordBatch(i, &batch)); + if (i < 0 && i >= reader->num_record_batches()) { + Rcpp::stop("Record batch index out of bounds"); } - return batch; + return VALUE_OR_STOP(reader->ReadRecordBatch(i)); } // [[arrow::export]] @@ -95,7 +94,7 @@ std::shared_ptr Table__from_RecordBatchFileReader( int num_batches = reader->num_record_batches(); std::vector> batches(num_batches); for (int i = 0; i < num_batches; i++) { - STOP_IF_NOT_OK(reader->ReadRecordBatch(i, &batches[i])); + batches[i] = VALUE_OR_STOP(reader->ReadRecordBatch(i)); } return VALUE_OR_STOP(arrow::Table::FromRecordBatches(std::move(batches))); @@ -122,7 +121,7 @@ std::vector> ipc___RecordBatchFileReader__ba std::vector> res(n); for (int i = 0; i < n; i++) { - STOP_IF_NOT_OK(reader->ReadRecordBatch(i, &res[i])); + res[i] = VALUE_OR_STOP(reader->ReadRecordBatch(i)); } return res; diff --git a/r/src/schema.cpp b/r/src/schema.cpp index 4eef7e7c99e..05c06f6917b 100644 --- a/r/src/schema.cpp +++ b/r/src/schema.cpp @@ -84,9 +84,8 @@ std::shared_ptr Schema__WithMetadata( // [[arrow::export]] Rcpp::RawVector Schema__serialize(const std::shared_ptr& schema) { arrow::ipc::DictionaryMemo empty_memo; - std::shared_ptr out; - STOP_IF_NOT_OK(arrow::ipc::SerializeSchema(*schema, &empty_memo, - arrow::default_memory_pool(), &out)); + std::shared_ptr out = + VALUE_OR_STOP(arrow::ipc::SerializeSchema(*schema, &empty_memo)); auto n = out->size(); Rcpp::RawVector vec(out->size());