From 33f341df5da91f370135362e476d00ce04c22a12 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 18 Nov 2017 00:40:38 -0500 Subject: [PATCH 01/14] Start making things virtual Change-Id: I0a252a1c7606b0d98827765029aaa4dce3e445bb --- cpp/src/arrow/table.h | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 2cff32f74ef..524b1e6e4bc 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -116,20 +116,24 @@ class ARROW_EXPORT Column { /// \class RecordBatch /// \brief Collection of equal-length arrays matching a particular Schema /// -/// A record batch is table-like data structure consisting of an internal -/// sequence of fields, each a contiguous Arrow array +/// A record batch is table-like data structure that is semantically a sequence +/// of fields, each a contiguous Arrow array class ARROW_EXPORT RecordBatch { public: + virtual ~RecordBatch() = default; + /// \param[in] schema The record batch schema /// \param[in] num_rows length of fields in the record batch. Each array /// should have the same length as num_rows /// \param[in] columns the record batch fields as vector of arrays - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); + static std::shared_ptr Make( + const std::shared_ptr& schema, + int64_t num_rows, const std::vector>& columns); /// \brief Move-based constructor for a vector of Array instances - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); /// \brief Construct record batch from vector of internal data structures /// \since 0.5.0 @@ -141,13 +145,15 @@ class ARROW_EXPORT RecordBatch { /// \param num_rows the number of semantic rows in the record batch. This /// should be equal to the length of each field /// \param columns the data for the batch's columns - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); /// \brief Construct record batch by copying vector of array data /// \since 0.5.0 - RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); /// \brief Determine if two record batches are exactly equal /// \return true if batches are equal @@ -158,20 +164,22 @@ class ARROW_EXPORT RecordBatch { // \return the table's schema /// \return true if batches are equal - std::shared_ptr schema() const { return schema_; } + std::shared_ptr schema() const = 0; /// \brief Retrieve an array from the record batch /// \param[in] i field index, does not boundscheck /// \return an Array object - std::shared_ptr column(int i) const; + virtual std::shared_ptr column(int i) const = 0; - std::shared_ptr column_data(int i) const { return columns_[i]; } + std::shared_ptr column_data(int i) const = 0; /// \brief Name in i-th column const std::string& column_name(int i) const; /// \return the number of columns in the table - int num_columns() const { return static_cast(columns_.size()); } + int num_columns() const = 0; + + // { return static_cast(columns_.size()); } /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } @@ -187,13 +195,14 @@ class ARROW_EXPORT RecordBatch { /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice, through end of batch /// \return new record batch - std::shared_ptr Slice(int64_t offset) const; + virtual std::shared_ptr Slice(int64_t offset) const = 0; /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice /// \param[in] length the number of elements to slice from offset /// \return new record batch - std::shared_ptr Slice(int64_t offset, int64_t length) const; + virtual std::shared_ptr Slice(int64_t offset, + int64_t length) const = 0; /// \brief Check for schema or length inconsistencies /// \return Status From ef00e5b959a12eac544b8c73c05ae011eed3f024 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 19 Nov 2017 15:18:39 -0500 Subject: [PATCH 02/14] Split off record batch implementation into separate files, progress refactoring Change-Id: I10e2b777beae1adc7dbbe2120d9331c9b37eb4ff --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/api.h | 1 + cpp/src/arrow/array.h | 2 + cpp/src/arrow/builder.cc | 1 - cpp/src/arrow/builder.h | 1 - cpp/src/arrow/column-benchmark.cc | 1 + cpp/src/arrow/compute/kernel.h | 1 + cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 2 +- cpp/src/arrow/ipc/feather-test.cc | 3 +- cpp/src/arrow/ipc/feather.cc | 1 + cpp/src/arrow/ipc/ipc-json-test.cc | 4 +- cpp/src/arrow/ipc/ipc-read-write-benchmark.cc | 2 +- cpp/src/arrow/ipc/ipc-read-write-test.cc | 9 +- cpp/src/arrow/ipc/json-integration-test.cc | 2 +- cpp/src/arrow/ipc/json-internal.cc | 6 +- cpp/src/arrow/ipc/json.cc | 2 +- cpp/src/arrow/ipc/reader.cc | 4 +- cpp/src/arrow/ipc/reader.h | 3 +- cpp/src/arrow/ipc/test-common.h | 45 ++--- cpp/src/arrow/ipc/writer.cc | 3 +- cpp/src/arrow/pretty_print.cc | 2 +- cpp/src/arrow/record_batch.cc | 186 ++++++++++++++++++ cpp/src/arrow/record_batch.h | 184 +++++++++++++++++ cpp/src/arrow/table-test.cc | 33 ++-- cpp/src/arrow/table.cc | 134 +------------ cpp/src/arrow/table.h | 127 +----------- cpp/src/arrow/table_builder-test.cc | 3 +- cpp/src/arrow/table_builder.cc | 3 +- cpp/src/arrow/test-common.h | 1 - cpp/src/arrow/test-util.h | 1 - 30 files changed, 443 insertions(+), 326 deletions(-) create mode 100644 cpp/src/arrow/record_batch.cc create mode 100644 cpp/src/arrow/record_batch.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 496e0da9d62..94705781fa4 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -22,6 +22,7 @@ set(ARROW_SRCS compare.cc memory_pool.cc pretty_print.cc + record_batch.cc status.cc table.cc table_builder.cc @@ -144,6 +145,7 @@ install(FILES compare.h memory_pool.h pretty_print.h + record_batch.h status.h table.h table_builder.h diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h index 5d2e859f3a4..c4e0dcbbf60 100644 --- a/cpp/src/arrow/api.h +++ b/cpp/src/arrow/api.h @@ -27,6 +27,7 @@ #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" #include "arrow/status.h" +#include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/table_builder.h" #include "arrow/tensor.h" diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 28756a6abda..dda9dd38be4 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -279,6 +279,8 @@ class ARROW_EXPORT Array { ARROW_DISALLOW_COPY_AND_ASSIGN(Array); }; +using ArrayVector = std::vector>; + static inline std::ostream& operator<<(std::ostream& os, const Array& x) { os << x.ToString(); return os; diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 3e213fcd5ca..a42f9024545 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -28,7 +28,6 @@ #include "arrow/buffer.h" #include "arrow/compare.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 32741b53ac4..e59e166580a 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -29,7 +29,6 @@ #include "arrow/buffer.h" #include "arrow/memory_pool.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/column-benchmark.cc b/cpp/src/arrow/column-benchmark.cc index e50ddf6d703..af2c368c329 100644 --- a/cpp/src/arrow/column-benchmark.cc +++ b/cpp/src/arrow/column-benchmark.cc @@ -19,6 +19,7 @@ #include "arrow/array.h" #include "arrow/memory_pool.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0037245d610..e160d9c80a1 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -22,6 +22,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/util/macros.h" #include "arrow/util/variant.h" diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 022268e0347..61a0e97b4de 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -28,7 +28,7 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/status.h" -#include "arrow/table.h" +#include "arrow/record_batch.h" #include "arrow/util/visibility.h" #include "arrow/gpu/cuda_context.h" diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index 6bd16462df9..0b2b9d7869b 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -29,6 +29,7 @@ #include "arrow/ipc/feather.h" #include "arrow/ipc/test-common.h" #include "arrow/pretty_print.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { @@ -376,7 +377,7 @@ TEST_F(TestTableWriter, TimeTypes) { schema->field(i)->type(), values->length(), buffers, values->null_count(), 0)); } - RecordBatch batch(schema, values->length(), std::move(arrays)); + SimpleRecordBatch batch(schema, values->length(), std::move(arrays)); CheckBatch(batch); } diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index cea720bd01b..077dc39305b 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/feather-internal.h" #include "arrow/ipc/feather_generated.h" #include "arrow/ipc/util.h" // IWYU pragma: keep +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index a560f09d6fd..e496826f96b 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -31,8 +31,8 @@ #include "arrow/ipc/json.h" #include "arrow/ipc/test-common.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/type_traits.h" @@ -269,7 +269,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { std::vector> arrays; MakeBatchArrays(schema, num_rows, &arrays); - auto batch = std::make_shared(schema, num_rows, arrays); + auto batch = RecordBatch::Make(schema, num_rows, arrays); batches.push_back(batch); ASSERT_OK(writer->WriteRecordBatch(*batch)); } diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc index 9ed0abde651..8561fb86037 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc @@ -63,7 +63,7 @@ std::shared_ptr MakeRecordBatch(int64_t total_size, int64_t num_fie } auto schema = std::make_shared(fields); - return std::make_shared(schema, length, arrays); + return RecordBatch::Make(schema, length, arrays); } static void BM_WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 40cd3f0eef0..e88de14f515 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -197,7 +197,7 @@ class IpcTestFixture : public io::MemoryMapFixture { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - RecordBatch batch(schema, 0, {array}); + SimpleRecordBatch batch(schema, 0, {array}); CheckRoundtrip(batch, buffer_size); } @@ -292,7 +292,7 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) { auto CheckArray = [this](const std::shared_ptr& array) { auto f0 = field("f0", array->type()); auto schema = ::arrow::schema({f0}); - RecordBatch batch(schema, array->length(), {array}); + SimpleRecordBatch batch(schema, array->length(), {array}); auto sliced_batch = batch.Slice(0, 5); int64_t full_size; @@ -411,8 +411,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { *schema = ::arrow::schema({f0}); - std::vector> arrays = {array}; - *batch = std::make_shared(*schema, batch_length, arrays); + *batch = RecordBatch::Make(*schema, batch_length, {array}); std::stringstream ss; ss << "test-write-past-max-recursion-" << g_file_number++; @@ -632,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - RecordBatch batch(schema, length, {array}); + SimpleRecordBatch batch(schema, length, {array}); std::string path = "test-write-large-record_batch"; diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index c7530a467b3..f487487dfda 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -34,8 +34,8 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index bdf1ef52b40..5b7ed54e73c 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -28,8 +28,8 @@ #include "arrow/array.h" #include "arrow/builder.h" #include "arrow/ipc/dictionary.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -125,7 +125,7 @@ class SchemaWriter { // Make a dummy record batch. A bit tedious as we have to make a schema auto schema = ::arrow::schema({arrow::field("dictionary", dictionary->type())}); - RecordBatch batch(schema, dictionary->length(), {dictionary}); + SimpleRecordBatch batch(schema, dictionary->length(), {dictionary}); RETURN_NOT_OK(WriteRecordBatch(batch, writer_)); writer_->EndObject(); return Status::OK(); @@ -1435,7 +1435,7 @@ Status ReadRecordBatch(const rj::Value& json_obj, const std::shared_ptr& RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i])); } - *batch = std::make_shared(schema, num_rows, columns); + *batch = RecordBatch::Make(schema, num_rows, columns); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 30a1bb81e1a..ea2947d5d4c 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -24,8 +24,8 @@ #include "arrow/buffer.h" #include "arrow/ipc/json-internal.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8e10d7d66f9..5960e81883d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -37,8 +37,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -307,7 +307,7 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr& schema, arrays[i] = std::move(arr); } - *out = std::make_shared(schema, num_rows, std::move(arrays)); + *out = RecordBatch::Make(schema, num_rows, std::move(arrays)); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 7581fbda5b1..627f67e2517 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -24,13 +24,12 @@ #include #include "arrow/ipc/message.h" -#include "arrow/table.h" +#include "arrow/record_batch.h" #include "arrow/util/visibility.h" namespace arrow { class Buffer; -class RecordBatch; class Schema; class Status; class Tensor; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 7fc13938105..207f96487d7 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -30,8 +30,8 @@ #include "arrow/builder.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -184,7 +184,7 @@ Status MakeBooleanBatchSized(const int length, std::shared_ptr* out std::shared_ptr a0, a1; RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0)); RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -203,7 +203,7 @@ Status MakeIntBatchSized(int length, std::shared_ptr* out) { MemoryPool* pool = default_memory_pool(); RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -252,7 +252,7 @@ Status MakeStringTypesRecordBatch(std::shared_ptr* out) { auto s = MakeRandomBinaryArray(length, true, pool, &a1); RETURN_NOT_OK(s); } - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -261,7 +261,7 @@ Status MakeNullRecordBatch(std::shared_ptr* out) { auto f0 = field("f0", null()); auto schema = ::arrow::schema({f0}); std::shared_ptr a0 = std::make_shared(length); - out->reset(new RecordBatch(schema, length, {a0})); + *out = RecordBatch::Make(schema, length, {a0}); return Status::OK(); } @@ -284,7 +284,7 @@ Status MakeListRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -304,7 +304,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, 0, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -327,7 +327,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -347,7 +347,7 @@ Status MakeDeeplyNestedList(std::shared_ptr* out) { auto f0 = field("f0", type); auto schema = ::arrow::schema({f0}); std::vector> arrays = {array}; - out->reset(new RecordBatch(schema, batch_length, arrays)); + *out = RecordBatch::Make(schema, batch_length, arrays); return Status::OK(); } @@ -377,7 +377,7 @@ Status MakeStruct(std::shared_ptr* out) { // construct batch std::vector> arrays = {no_nulls, with_nulls}; - out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays)); + *out = RecordBatch::Make(schema, list_batch->num_rows(), arrays); return Status::OK(); } @@ -445,7 +445,7 @@ Status MakeUnion(std::shared_ptr* out) { // construct batch std::vector> arrays = {sparse_no_nulls, sparse, dense}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -526,7 +526,7 @@ Status MakeDictionary(std::shared_ptr* out) { std::vector> arrays = {a0, a1, a2, a3, a4}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -564,7 +564,7 @@ Status MakeDictionaryFlat(std::shared_ptr* out) { {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)}); std::vector> arrays = {a0, a1, a2}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -584,8 +584,8 @@ Status MakeDates(std::shared_ptr* out) { std::shared_ptr date64_array; ArrayFromVector(is_valid, date64_values, &date64_array); - std::vector> arrays = {date32_array, date64_array}; - *out = std::make_shared(schema, date32_array->length(), arrays); + *out = RecordBatch::Make(schema, date32_array->length(), + {date32_array, date64_array}); return Status::OK(); } @@ -604,8 +604,7 @@ Status MakeTimestamps(std::shared_ptr* out) { ArrayFromVector(f1->type(), is_valid, ts_values, &a1); ArrayFromVector(f2->type(), is_valid, ts_values, &a2); - ArrayVector arrays = {a0, a1, a2}; - *out = std::make_shared(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2}); return Status::OK(); } @@ -628,8 +627,7 @@ Status MakeTimes(std::shared_ptr* out) { ArrayFromVector(f2->type(), is_valid, t32_values, &a2); ArrayFromVector(f3->type(), is_valid, t64_values, &a3); - ArrayVector arrays = {a0, a1, a2, a3}; - *out = std::make_shared(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2, a3}); return Status::OK(); } @@ -665,8 +663,7 @@ Status MakeFWBinary(std::shared_ptr* out) { RETURN_NOT_OK(b1.Finish(&a1)); RETURN_NOT_OK(b2.Finish(&a2)); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } @@ -695,8 +692,7 @@ Status MakeDecimal(std::shared_ptr* out) { auto a2 = std::make_shared(f1->type(), length, data); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, length, arrays); + *out = RecordBatch::Make(schema, length, {a1, a2}); return Status::OK(); } @@ -716,8 +712,7 @@ Status MakeNull(std::shared_ptr* out) { std::shared_ptr a2; ArrayFromVector(f1->type(), is_valid, int_values, &a2); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 323116f589b..14a1dc5f9d4 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/tensor.h" @@ -511,7 +512,7 @@ class DictionaryWriter : public RecordBatchSerializer { std::vector> fields = { arrow::field("dictionary", dictionary->type())}; auto schema = std::make_shared(fields); - RecordBatch batch(schema, dictionary->length(), {dictionary}); + SimpleRecordBatch batch(schema, dictionary->length(), {dictionary}); return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); } diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index cfbc30315fc..bd5f8ce10ea 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -22,8 +22,8 @@ #include "arrow/array.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc new file mode 100644 index 00000000000..3b946e143ff --- /dev/null +++ b/cpp/src/arrow/record_batch.cc @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/record_batch.h" + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow { + +RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) + : schema_(schema), num_rows_(num_rows) { + boxed_columns_.resize(schema->num_fields()); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) { + return std::make_shared(schema, num_rows, columns); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) { + return std::make_shared(schema, num_rows, std::move(columns)); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) { + return std::make_shared(schema, num_rows, std::move(columns)); +} + +std::shared_ptr RecordBatch::Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) { + return std::make_shared(schema, num_rows, columns); +} + +const std::string& RecordBatch::column_name(int i) const { + return schema_->field(i)->name(); +} + +bool RecordBatch::Equals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->Equals(other.column(i))) { + return false; + } + } + + return true; +} + +bool RecordBatch::ApproxEquals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->ApproxEquals(other.column(i))) { + return false; + } + } + + return true; +} + +std::shared_ptr RecordBatch::Slice(int64_t offset) const { + return Slice(offset, this->num_rows() - offset); +} + +Status RecordBatch::Validate() const { + for (int i = 0; i < num_columns(); ++i) { + auto arr_shared = this->column_data(i); + const ArrayData& arr = *arr_shared; + if (arr.length != num_rows_) { + std::stringstream ss; + ss << "Number of rows in column " << i << " did not match batch: " << arr.length + << " vs " << num_rows_; + return Status::Invalid(ss.str()); + } + const auto& schema_type = *schema_->field(i)->type(); + if (!arr.type->Equals(schema_type)) { + std::stringstream ss; + ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " + << schema_type.ToString(); + return Status::Invalid(ss.str()); + } + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// In-memory simple record batch implementation + + +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } +} + +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } +} + +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + columns_ = std::move(columns); +} + +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + columns_ = columns; +} + +std::shared_ptr SimpleRecordBatch::column(int i) const { + if (!boxed_columns_[i]) { + boxed_columns_[i] = MakeArray(columns_[i]); + } + DCHECK(boxed_columns_[i]); + return boxed_columns_[i]; +} + +std::shared_ptr SimpleRecordBatch::column_data(int i) const { + return columns_[i]; +} + +std::shared_ptr SimpleRecordBatch::Slice(int64_t offset, int64_t length) const { + std::vector> arrays; + arrays.reserve(num_columns()); + for (const auto& field : columns_) { + int64_t col_length = std::min(field->length - offset, length); + int64_t col_offset = field->offset + offset; + + auto new_data = std::make_shared(*field); + new_data->length = col_length; + new_data->offset = col_offset; + new_data->null_count = kUnknownNullCount; + arrays.emplace_back(new_data); + } + int64_t num_rows = std::min(num_rows_ - offset, length); + return std::make_shared(schema_, num_rows, std::move(arrays)); +} + +// ---------------------------------------------------------------------- +// Base record batch reader + +RecordBatchReader::~RecordBatchReader() {} + +} // namespace arrow diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h new file mode 100644 index 00000000000..9fe1c76a357 --- /dev/null +++ b/cpp/src/arrow/record_batch.h @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_RECORD_BATCH_H +#define ARROW_RECORD_BATCH_H + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/type.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class KeyValueMetadata; +class Status; + +/// \class RecordBatch +/// \brief Collection of equal-length arrays matching a particular Schema +/// +/// A record batch is table-like data structure that is semantically a sequence +/// of fields, each a contiguous Arrow array +class ARROW_EXPORT RecordBatch { + public: + virtual ~RecordBatch() = default; + + /// \param[in] schema The record batch schema + /// \param[in] num_rows length of fields in the record batch. Each array + /// should have the same length as num_rows + /// \param[in] columns the record batch fields as vector of arrays + static std::shared_ptr Make( + const std::shared_ptr& schema, + int64_t num_rows, const std::vector>& columns); + + /// \brief Move-based constructor for a vector of Array instances + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); + + /// \brief Construct record batch from vector of internal data structures + /// \since 0.5.0 + /// + /// This class is only provided with an rvalue-reference for the input data, + /// and is intended for internal use, or advanced users. + /// + /// \param schema the record batch schema + /// \param num_rows the number of semantic rows in the record batch. This + /// should be equal to the length of each field + /// \param columns the data for the batch's columns + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); + + /// \brief Construct record batch by copying vector of array data + /// \since 0.5.0 + static std::shared_ptr Make( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); + + /// \brief Determine if two record batches are exactly equal + /// \return true if batches are equal + bool Equals(const RecordBatch& other) const; + + /// \brief Determine if two record batches are approximately equal + bool ApproxEquals(const RecordBatch& other) const; + + // \return the table's schema + /// \return true if batches are equal + std::shared_ptr schema() const { + return schema_; + } + + /// \brief Retrieve an array from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an Array object + virtual std::shared_ptr column(int i) const = 0; + + /// \brief Retrieve an array's internaldata from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an internal ArrayData object + virtual std::shared_ptr column_data(int i) const = 0; + + /// \brief Name in i-th column + const std::string& column_name(int i) const; + + /// \return the number of columns in the table + int num_columns() const { + return schema_->num_fields(); + } + + /// \return the number of rows (the corresponding length of each column) + int64_t num_rows() const { return num_rows_; } + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice, through end of batch + /// \return new record batch + virtual std::shared_ptr Slice(int64_t offset) const; + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice + /// \param[in] length the number of elements to slice from offset + /// \return new record batch + virtual std::shared_ptr Slice(int64_t offset, + int64_t length) const = 0; + + /// \brief Check for schema or length inconsistencies + /// \return Status + Status Validate() const; + + protected: + RecordBatch(const std::shared_ptr& schema, int64_t num_rows); + + std::shared_ptr schema_; + int64_t num_rows_; + + // Caching boxed array data + mutable std::vector> boxed_columns_; + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); +}; + +/// \class SimpleRecordBatch +/// \brief A basic, non-lazy in-memory record batch +class ARROW_EXPORT SimpleRecordBatch : public RecordBatch { + public: + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns); + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); + + std::shared_ptr column(int i) const override; + std::shared_ptr column_data(int i) const override; + + std::shared_ptr Slice(int64_t offset, int64_t length) const override; + + private: + std::vector> columns_; +}; + +/// \brief Abstract interface for reading stream of record batches +class ARROW_EXPORT RecordBatchReader { + public: + virtual ~RecordBatchReader(); + + /// \return the shared schema of the record batches in the stream + virtual std::shared_ptr schema() const = 0; + + /// Read the next record batch in the stream. Return null for batch when + /// reaching end of stream + /// + /// \param[out] batch the next loaded batch, null at end of stream + /// \return Status + virtual Status ReadNext(std::shared_ptr* batch) = 0; +}; + +} // namespace arrow + +#endif // ARROW_RECORD_BATCH_H diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index b490310c26a..c8e4dd6d3d2 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-common.h" @@ -310,7 +311,7 @@ TEST_F(TestTable, FromRecordBatches) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &result)); @@ -336,7 +337,7 @@ TEST_F(TestTable, FromRecordBatches) { auto other_schema = std::make_shared(fields); std::vector> other_arrays = {arrays_[0], arrays_[1]}; - auto batch2 = std::make_shared(other_schema, length, other_arrays); + auto batch2 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result)); } @@ -344,11 +345,11 @@ TEST_F(TestTable, ConcatenateTables) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); // generate different data MakeExample1(length); - auto batch2 = std::make_shared(schema_, length, arrays_); + auto batch2 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr
t1, t2, t3, result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &t1)); @@ -366,7 +367,7 @@ TEST_F(TestTable, ConcatenateTables) { auto other_schema = std::make_shared(fields); std::vector> other_arrays = {arrays_[0], arrays_[1]}; - auto batch3 = std::make_shared(other_schema, length, other_arrays); + auto batch3 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_OK(Table::FromRecordBatches({batch3}, &t3)); ASSERT_RAISES(Invalid, ConcatenateTables({t1, t3}, &result)); @@ -481,9 +482,9 @@ TEST_F(TestRecordBatch, Equals) { auto a1 = MakeRandomArray(length); auto a2 = MakeRandomArray(length); - RecordBatch b1(schema, length, {a0, a1, a2}); - RecordBatch b3(schema, length, {a0, a1}); - RecordBatch b4(schema, length, {a0, a1, a1}); + SimpleRecordBatch b1(schema, length, {a0, a1, a2}); + SimpleRecordBatch b3(schema, length, {a0, a1}); + SimpleRecordBatch b4(schema, length, {a0, a1, a1}); ASSERT_TRUE(b1.Equals(b1)); ASSERT_FALSE(b1.Equals(b3)); @@ -507,16 +508,16 @@ TEST_F(TestRecordBatch, Validate) { auto a2 = MakeRandomArray(length); auto a3 = MakeRandomArray(5); - RecordBatch b1(schema, length, {a0, a1, a2}); + SimpleRecordBatch b1(schema, length, {a0, a1, a2}); ASSERT_OK(b1.Validate()); // Length mismatch - RecordBatch b2(schema, length, {a0, a1, a3}); + SimpleRecordBatch b2(schema, length, {a0, a1, a3}); ASSERT_RAISES(Invalid, b2.Validate()); // Type mismatch - RecordBatch b3(schema, length, {a0, a1, a0}); + SimpleRecordBatch b3(schema, length, {a0, a1, a0}); ASSERT_RAISES(Invalid, b3.Validate()); } @@ -534,14 +535,14 @@ TEST_F(TestRecordBatch, Slice) { auto a0 = MakeRandomArray(length); auto a1 = MakeRandomArray(length); - RecordBatch batch(schema, length, {a0, a1}); + auto batch = RecordBatch::Make(schema, length, {a0, a1}); - auto batch_slice = batch.Slice(2); - auto batch_slice2 = batch.Slice(1, 5); + auto batch_slice = batch->Slice(2); + auto batch_slice2 = batch->Slice(1, 5); - ASSERT_EQ(batch_slice->num_rows(), batch.num_rows() - 2); + ASSERT_EQ(batch_slice->num_rows(), batch->num_rows() - 2); - for (int i = 0; i < batch.num_columns(); ++i) { + for (int i = 0; i < batch->num_columns(); ++i) { ASSERT_EQ(2, batch_slice->column(i)->offset()); ASSERT_EQ(length - 2, batch_slice->column(i)->length()); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index fe19bf4ce0b..aec2c34f96b 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -23,6 +23,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/logging.h" @@ -152,131 +153,6 @@ Status Column::ValidateData() { return Status::OK(); } -// ---------------------------------------------------------------------- -// RecordBatch methods - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) - : schema_(schema), num_rows_(num_rows) { - boxed_columns_.resize(schema->num_fields()); -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - columns_ = std::move(columns); -} - -RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - columns_ = columns; -} - -std::shared_ptr RecordBatch::column(int i) const { - if (!boxed_columns_[i]) { - boxed_columns_[i] = MakeArray(columns_[i]); - } - DCHECK(boxed_columns_[i]); - return boxed_columns_[i]; -} - -const std::string& RecordBatch::column_name(int i) const { - return schema_->field(i)->name(); -} - -bool RecordBatch::Equals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } - - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->Equals(other.column(i))) { - return false; - } - } - - return true; -} - -bool RecordBatch::ApproxEquals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } - - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->ApproxEquals(other.column(i))) { - return false; - } - } - - return true; -} - -std::shared_ptr RecordBatch::ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared(new_schema, num_rows_, columns_); -} - -std::shared_ptr RecordBatch::Slice(int64_t offset) const { - return Slice(offset, this->num_rows() - offset); -} - -std::shared_ptr RecordBatch::Slice(int64_t offset, int64_t length) const { - std::vector> arrays; - arrays.reserve(num_columns()); - for (const auto& field : columns_) { - int64_t col_length = std::min(field->length - offset, length); - int64_t col_offset = field->offset + offset; - - auto new_data = std::make_shared(*field); - new_data->length = col_length; - new_data->offset = col_offset; - new_data->null_count = kUnknownNullCount; - arrays.emplace_back(new_data); - } - int64_t num_rows = std::min(num_rows_ - offset, length); - return std::make_shared(schema_, num_rows, std::move(arrays)); -} - -Status RecordBatch::Validate() const { - for (int i = 0; i < num_columns(); ++i) { - const ArrayData& arr = *columns_[i]; - if (arr.length != num_rows_) { - std::stringstream ss; - ss << "Number of rows in column " << i << " did not match batch: " << arr.length - << " vs " << num_rows_; - return Status::Invalid(ss.str()); - } - const auto& schema_type = *schema_->field(i)->type(); - if (!arr.type->Equals(schema_type)) { - std::stringstream ss; - ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " - << schema_type.ToString(); - return Status::Invalid(ss.str()); - } - } - return Status::OK(); -} - // ---------------------------------------------------------------------- // Table methods @@ -498,11 +374,6 @@ Status MakeTable(const std::shared_ptr& schema, return Status::OK(); } -// ---------------------------------------------------------------------- -// Base record batch reader - -RecordBatchReader::~RecordBatchReader() {} - // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches @@ -565,8 +436,7 @@ class TableBatchReader::TableBatchReaderImpl { } absolute_row_position_ += chunksize; - *out = - std::make_shared(table_.schema(), chunksize, std::move(batch_data)); + *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); return Status::OK(); } diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 524b1e6e4bc..41ed6dedf95 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -24,6 +24,7 @@ #include #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/type.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -33,8 +34,6 @@ namespace arrow { class KeyValueMetadata; class Status; -using ArrayVector = std::vector>; - /// \class ChunkedArray /// \brief A data structure managing a list of primitive Arrow arrays logically /// as one large array @@ -113,114 +112,6 @@ class ARROW_EXPORT Column { ARROW_DISALLOW_COPY_AND_ASSIGN(Column); }; -/// \class RecordBatch -/// \brief Collection of equal-length arrays matching a particular Schema -/// -/// A record batch is table-like data structure that is semantically a sequence -/// of fields, each a contiguous Arrow array -class ARROW_EXPORT RecordBatch { - public: - virtual ~RecordBatch() = default; - - /// \param[in] schema The record batch schema - /// \param[in] num_rows length of fields in the record batch. Each array - /// should have the same length as num_rows - /// \param[in] columns the record batch fields as vector of arrays - static std::shared_ptr Make( - const std::shared_ptr& schema, - int64_t num_rows, const std::vector>& columns); - - /// \brief Move-based constructor for a vector of Array instances - static std::shared_ptr Make( - const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - /// \brief Construct record batch from vector of internal data structures - /// \since 0.5.0 - /// - /// This class is only provided with an rvalue-reference for the input data, - /// and is intended for internal use, or advanced users. - /// - /// \param schema the record batch schema - /// \param num_rows the number of semantic rows in the record batch. This - /// should be equal to the length of each field - /// \param columns the data for the batch's columns - static std::shared_ptr Make( - const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - /// \brief Construct record batch by copying vector of array data - /// \since 0.5.0 - static std::shared_ptr Make( - const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); - - /// \brief Determine if two record batches are exactly equal - /// \return true if batches are equal - bool Equals(const RecordBatch& other) const; - - /// \brief Determine if two record batches are approximately equal - bool ApproxEquals(const RecordBatch& other) const; - - // \return the table's schema - /// \return true if batches are equal - std::shared_ptr schema() const = 0; - - /// \brief Retrieve an array from the record batch - /// \param[in] i field index, does not boundscheck - /// \return an Array object - virtual std::shared_ptr column(int i) const = 0; - - std::shared_ptr column_data(int i) const = 0; - - /// \brief Name in i-th column - const std::string& column_name(int i) const; - - /// \return the number of columns in the table - int num_columns() const = 0; - - // { return static_cast(columns_.size()); } - - /// \return the number of rows (the corresponding length of each column) - int64_t num_rows() const { return num_rows_; } - - /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) - /// \since 0.5.0 - /// - /// \param[in] metadata new KeyValueMetadata - /// \return new RecordBatch - std::shared_ptr ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice, through end of batch - /// \return new record batch - virtual std::shared_ptr Slice(int64_t offset) const = 0; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice - /// \param[in] length the number of elements to slice from offset - /// \return new record batch - virtual std::shared_ptr Slice(int64_t offset, - int64_t length) const = 0; - - /// \brief Check for schema or length inconsistencies - /// \return Status - Status Validate() const; - - private: - ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); - - RecordBatch(const std::shared_ptr& schema, int64_t num_rows); - - std::shared_ptr schema_; - int64_t num_rows_; - std::vector> columns_; - - // Caching boxed array data - mutable std::vector> boxed_columns_; -}; - /// \class Table /// \brief Logical table as sequence of chunked arrays class ARROW_EXPORT Table { @@ -292,22 +183,6 @@ class ARROW_EXPORT Table { int64_t num_rows_; }; -/// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT RecordBatchReader { - public: - virtual ~RecordBatchReader(); - - /// \return the shared schema of the record batches in the stream - virtual std::shared_ptr schema() const = 0; - - /// Read the next record batch in the stream. Return null for batch when - /// reaching end of stream - /// - /// \param[out] batch the next loaded batch, null at end of stream - /// \return Status - virtual Status ReadNext(std::shared_ptr* batch) = 0; -}; - /// \brief Compute a sequence of record batches from a (possibly chunked) Table class ARROW_EXPORT TableBatchReader : public RecordBatchReader { public: diff --git a/cpp/src/arrow/table_builder-test.cc b/cpp/src/arrow/table_builder-test.cc index 07d9b6b2d65..a3306c5b566 100644 --- a/cpp/src/arrow/table_builder-test.cc +++ b/cpp/src/arrow/table_builder-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -98,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(ex_b1.Finish(&a1)); ASSERT_OK(ex_b2.Finish(&a2)); - RecordBatch expected(schema, 4, {a0, a1, a2}); + SimpleRecordBatch expected(schema, 4, {a0, a1, a2}); // Builder attributes ASSERT_EQ(3, builder->num_fields()); diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc index a1bd95940a6..379d886deac 100644 --- a/cpp/src/arrow/table_builder.cc +++ b/cpp/src/arrow/table_builder.cc @@ -24,6 +24,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" @@ -64,7 +65,7 @@ Status RecordBatchBuilder::Flush(bool reset_builders, } length = fields[i]->length(); } - *batch = std::make_shared(schema_, length, std::move(fields)); + *batch = RecordBatch::Make(schema_, length, std::move(fields)); if (reset_builders) { return InitBuilders(); } else { diff --git a/cpp/src/arrow/test-common.h b/cpp/src/arrow/test-common.h index a4c4fddff73..911adf7b605 100644 --- a/cpp/src/arrow/test-common.h +++ b/cpp/src/arrow/test-common.h @@ -30,7 +30,6 @@ #include "arrow/buffer.h" #include "arrow/builder.h" #include "arrow/memory_pool.h" -#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 77f489ab177..7560df2b04b 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -35,7 +35,6 @@ #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" From 267dd21882f35fcb252f10d7ba9a24fa8b9d5ebd Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 19 Nov 2017 19:59:25 -0500 Subject: [PATCH 03/14] Finish RecordBatch refactoring, get tests passing again. Add option to set build type in Travis CI Change-Id: I8fe9a8e155c6c37a7a4a5f921eb411d45bd8c5d8 --- .travis.yml | 1 + ci/travis_before_script_cpp.sh | 2 ++ ci/travis_env_common.sh | 2 ++ ci/travis_script_python.sh | 3 +++ cpp/src/arrow/api.h | 2 +- cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 2 +- cpp/src/arrow/ipc/test-common.h | 3 +-- cpp/src/arrow/python/python-test.cc | 1 + cpp/src/arrow/python/python_to_arrow.cc | 8 ++++--- cpp/src/arrow/record_batch.cc | 28 ++++++++++++++++++------- cpp/src/arrow/record_batch.h | 27 ++++++++++++------------ cpp/src/arrow/table-test.cc | 5 +++-- python/pyarrow/includes/libarrow.pxd | 6 ++++-- python/pyarrow/table.pxi | 5 ++--- 14 files changed, 61 insertions(+), 34 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9c714a68948..6333ad95182 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,6 +55,7 @@ matrix: - export ARROW_TRAVIS_VALGRIND=1 - export ARROW_TRAVIS_PLASMA=1 - export ARROW_TRAVIS_CLANG_FORMAT=1 + - export ARROW_BUILD_TYPE=release - export CC="clang-4.0" - export CXX="clang++-4.0" - $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 4998f190f98..b90a6dd8784 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -91,11 +91,13 @@ fi if [ $TRAVIS_OS_NAME == "linux" ]; then cmake $CMAKE_COMMON_FLAGS \ $CMAKE_LINUX_FLAGS \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DBUILD_WARNING_LEVEL=CHECKIN \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ $CMAKE_OSX_FLAGS \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DBUILD_WARNING_LEVEL=CHECKIN \ $ARROW_CPP_DIR fi diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh index 52c7da4e017..2ea819ce377 100755 --- a/ci/travis_env_common.sh +++ b/ci/travis_env_common.sh @@ -38,6 +38,8 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env export CMAKE_EXPORT_COMPILE_COMMANDS=1 +export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug} + if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then # C++ toolchain export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index 603201bcc16..5f7b0a9a1af 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -63,6 +63,7 @@ cmake -GNinja \ -DARROW_BUILD_UTILITIES=off \ -DARROW_PLASMA=on \ -DARROW_PYTHON=on \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ $ARROW_CPP_DIR @@ -78,6 +79,8 @@ if [ "$PYTHON_VERSION" == "2.7" ]; then pip install futures fi +export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE + pip install -r requirements.txt python setup.py build_ext --with-parquet --with-plasma \ install --single-version-externally-managed --record=record.text diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h index c4e0dcbbf60..7cae8414a77 100644 --- a/cpp/src/arrow/api.h +++ b/cpp/src/arrow/api.h @@ -26,8 +26,8 @@ #include "arrow/compare.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" -#include "arrow/status.h" #include "arrow/record_batch.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" #include "arrow/tensor.h" diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 61a0e97b4de..a7262c8b4d4 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -27,8 +27,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" -#include "arrow/status.h" #include "arrow/record_batch.h" +#include "arrow/status.h" #include "arrow/util/visibility.h" #include "arrow/gpu/cuda_context.h" diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 207f96487d7..6f8a0dcc61f 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -584,8 +584,7 @@ Status MakeDates(std::shared_ptr* out) { std::shared_ptr date64_array; ArrayFromVector(is_valid, date64_values, &date64_array); - *out = RecordBatch::Make(schema, date32_array->length(), - {date32_array, date64_array}); + *out = RecordBatch::Make(schema, date32_array->length(), {date32_array, date64_array}); return Status::OK(); } diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc index 86391a18598..126bc223feb 100644 --- a/cpp/src/arrow/python/python-test.cc +++ b/cpp/src/arrow/python/python-test.cc @@ -23,6 +23,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/python/arrow_to_pandas.h" diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b0c6287f088..72cc5b6e1db 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -32,13 +32,15 @@ #include "arrow/builder.h" #include "arrow/io/interfaces.h" #include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/tensor.h" +#include "arrow/util/logging.h" + #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" #include "arrow/python/util/datetime.h" -#include "arrow/tensor.h" -#include "arrow/util/logging.h" constexpr int32_t kMaxRecursionDepth = 100; @@ -694,7 +696,7 @@ Status SerializeDict(PyObject* context, std::vector dicts, std::shared_ptr MakeBatch(std::shared_ptr data) { auto field = std::make_shared("list", data->type()); auto schema = ::arrow::schema({field}); - return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); + return RecordBatch::Make(schema, data->length(), {data}); } Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) { diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 3b946e143ff..7eff83f629d 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -118,34 +118,41 @@ Status RecordBatch::Validate() const { // ---------------------------------------------------------------------- // In-memory simple record batch implementation - -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, + int64_t num_rows, const std::vector>& columns) : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } } -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, + int64_t num_rows, std::vector>&& columns) : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } } -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, +SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, + int64_t num_rows, std::vector>&& columns) : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = std::move(columns); } -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) +SimpleRecordBatch::SimpleRecordBatch( + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = columns; } @@ -161,7 +168,14 @@ std::shared_ptr SimpleRecordBatch::column_data(int i) const { return columns_[i]; } -std::shared_ptr SimpleRecordBatch::Slice(int64_t offset, int64_t length) const { +std::shared_ptr SimpleRecordBatch::ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const { + auto new_schema = schema_->AddMetadata(metadata); + return RecordBatch::Make(new_schema, num_rows_, columns_); +} + +std::shared_ptr SimpleRecordBatch::Slice(int64_t offset, + int64_t length) const { std::vector> arrays; arrays.reserve(num_columns()); for (const auto& field : columns_) { diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 9fe1c76a357..94d6045186d 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -47,13 +47,13 @@ class ARROW_EXPORT RecordBatch { /// should have the same length as num_rows /// \param[in] columns the record batch fields as vector of arrays static std::shared_ptr Make( - const std::shared_ptr& schema, - int64_t num_rows, const std::vector>& columns); + const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns); /// \brief Move-based constructor for a vector of Array instances - static std::shared_ptr Make( - const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); + static std::shared_ptr Make(const std::shared_ptr& schema, + int64_t num_rows, + std::vector>&& columns); /// \brief Construct record batch from vector of internal data structures /// \since 0.5.0 @@ -84,9 +84,7 @@ class ARROW_EXPORT RecordBatch { // \return the table's schema /// \return true if batches are equal - std::shared_ptr schema() const { - return schema_; - } + std::shared_ptr schema() const { return schema_; } /// \brief Retrieve an array from the record batch /// \param[in] i field index, does not boundscheck @@ -98,13 +96,14 @@ class ARROW_EXPORT RecordBatch { /// \return an internal ArrayData object virtual std::shared_ptr column_data(int i) const = 0; + virtual std::shared_ptr ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const = 0; + /// \brief Name in i-th column const std::string& column_name(int i) const; /// \return the number of columns in the table - int num_columns() const { - return schema_->num_fields(); - } + int num_columns() const { return schema_->num_fields(); } /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } @@ -118,8 +117,7 @@ class ARROW_EXPORT RecordBatch { /// \param[in] offset the starting offset to slice /// \param[in] length the number of elements to slice from offset /// \return new record batch - virtual std::shared_ptr Slice(int64_t offset, - int64_t length) const = 0; + virtual std::shared_ptr Slice(int64_t offset, int64_t length) const = 0; /// \brief Check for schema or length inconsistencies /// \return Status @@ -159,6 +157,9 @@ class ARROW_EXPORT SimpleRecordBatch : public RecordBatch { std::shared_ptr Slice(int64_t offset, int64_t length) const override; + std::shared_ptr ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const override; + private: std::vector> columns_; }; diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index c8e4dd6d3d2..0264a0f003c 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -476,14 +476,15 @@ TEST_F(TestRecordBatch, Equals) { auto f2 = field("f2", int16()); vector> fields = {f0, f1, f2}; - auto schema = std::make_shared(fields); + auto schema = ::arrow::schema({f0, f1, f2}); + auto schema2 = ::arrow::schema({f0, f1}); auto a0 = MakeRandomArray(length); auto a1 = MakeRandomArray(length); auto a2 = MakeRandomArray(length); SimpleRecordBatch b1(schema, length, {a0, a1, a2}); - SimpleRecordBatch b3(schema, length, {a0, a1}); + SimpleRecordBatch b3(schema2, length, {a0, a1}); SimpleRecordBatch b4(schema, length, {a0, a1, a1}); ASSERT_TRUE(b1.Equals(b1)); diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dbfd89cc378..7c1d8aa71a4 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -403,8 +403,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CChunkedArray] data() cdef cppclass CRecordBatch" arrow::RecordBatch": - CRecordBatch(const shared_ptr[CSchema]& schema, int64_t num_rows, - const vector[shared_ptr[CArray]]& columns) + @staticmethod + shared_ptr[CRecordBatch] Make( + const shared_ptr[CSchema]& schema, int64_t num_rows, + const vector[shared_ptr[CArray]]& columns) c_bool Equals(const CRecordBatch& other) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 591f3297587..01d880ed4d2 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -724,7 +724,6 @@ cdef class RecordBatch: Array arr c_string c_name shared_ptr[CSchema] schema - shared_ptr[CRecordBatch] batch vector[shared_ptr[CArray]] c_arrays int64_t num_rows int64_t i @@ -740,8 +739,8 @@ cdef class RecordBatch: for arr in arrays: c_arrays.push_back(arr.sp_array) - batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) - return pyarrow_wrap_batch(batch) + return pyarrow_wrap_batch( + CRecordBatch.Make(schema, num_rows, c_arrays)) def table_to_blocks(PandasOptions options, Table table, int nthreads, From bcc0cd155bd5de4a1c605119857ae34234d2d5bd Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 19 Nov 2017 23:18:20 -0500 Subject: [PATCH 04/14] Some table refactoring to be virtual Change-Id: I180f4c0d2af218b9fca4ff115be19c9dc6c7d9f7 --- cpp/src/arrow/table.cc | 145 +++++++++++++++++++++++------------------ cpp/src/arrow/table.h | 56 ++++++++++++---- 2 files changed, 125 insertions(+), 76 deletions(-) diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index aec2c34f96b..143daa86259 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -156,38 +156,16 @@ Status Column::ValidateData() { // ---------------------------------------------------------------------- // Table methods -Table::Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows) - : schema_(schema), columns_(columns) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); - } - } else { - num_rows_ = num_rows; - } +std::shared_ptr
Table::Make(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows) { + return std::make_shared(schema, columns, num_rows); } -Table::Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows) - : schema_(schema) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); - } - } else { - num_rows_ = num_rows; - } - - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = - std::make_shared(schema->field(static_cast(i)), columns[i]); - } +std::shared_ptr
Table::Make(const std::shared_ptr& schema, + const std::vector>& arrays, + int64_t num_rows) { + return std::make_shared(schema, arrays, num_rows); } std::shared_ptr
Table::ReplaceSchemaMetadata( @@ -287,39 +265,6 @@ bool Table::Equals(const Table& other) const { return true; } -Status Table::RemoveColumn(int i, std::shared_ptr
* out) const { - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); - - *out = std::make_shared
(new_schema, internal::DeleteVectorElement(columns_, i)); - return Status::OK(); -} - -Status Table::AddColumn(int i, const std::shared_ptr& col, - std::shared_ptr
* out) const { - if (i < 0 || i > num_columns() + 1) { - return Status::Invalid("Invalid column index."); - } - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Added column's length must match table's length. Expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); - - *out = - std::make_shared
(new_schema, internal::AddVectorElement(columns_, i, col)); - return Status::OK(); -} - Status Table::ValidateColumns() const { if (num_columns() != schema_->num_fields()) { return Status::Invalid("Number of columns did not match schema"); @@ -352,6 +297,80 @@ bool Table::IsChunked() const { return false; } + +SimpleTable::SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows) + : schema_(schema), columns_(columns) { + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } +} + +SimpleTable::SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows) + : schema_(schema) { + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } + + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = + std::make_shared(schema->field(static_cast(i)), columns[i]); + } +} + +std::shared_ptr SimpleTable::column(int i) const { + return columns_[i]; +} + + +Status SimpleTable::RemoveColumn(int i, std::shared_ptr
* out) const { + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); + + *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i)); + return Status::OK(); +} + +Status SimpleTable::AddColumn(int i, const std::shared_ptr& col, + std::shared_ptr
* out) const { + if (i < 0 || i > num_columns() + 1) { + return Status::Invalid("Invalid column index."); + } + if (col == nullptr) { + std::stringstream ss; + ss << "Column " << i << " was null"; + return Status::Invalid(ss.str()); + } + if (col->length() != num_rows_) { + std::stringstream ss; + ss << "Added column's length must match table's length. Expected length " << num_rows_ + << " but got length " << col->length(); + return Status::Invalid(ss.str()); + } + + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); + + *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); + return Status::OK(); +} + Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, std::shared_ptr
* table) { diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 41ed6dedf95..1c387a46249 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -121,15 +121,17 @@ class ARROW_EXPORT Table { /// \param schema The table schema (column types) /// \param columns The table's columns /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr& schema, - const std::vector>& columns, int64_t num_rows = -1); + static std::shared_ptr
Make(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows = -1); /// \brief Construct Table from schema and arrays /// \param schema The table schema (column types) /// \param arrays The table's columns as arrays /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr& schema, - const std::vector>& arrays, int64_t num_rows = -1); + static std::shared_ptr
Make(const std::shared_ptr& schema, + const std::vector>& arrays, + int64_t num_rows = -1); // Construct table from RecordBatch, but only if all of the batch schemas are // equal. Returns Status::Invalid if there is some problem @@ -142,14 +144,14 @@ class ARROW_EXPORT Table { /// \param[in] i column index, does not boundscheck /// \return the i-th column - std::shared_ptr column(int i) const { return columns_[i]; } + virtual std::shared_ptr column(int i) const = 0; /// \brief Remove column from the table, producing a new Table - Status RemoveColumn(int i, std::shared_ptr
* out) const; + virtual Status RemoveColumn(int i, std::shared_ptr
* out) const = 0; /// \brief Add column to the table, producing a new Table - Status AddColumn(int i, const std::shared_ptr& column, - std::shared_ptr
* out) const; + virtual Status AddColumn(int i, const std::shared_ptr& column, + std::shared_ptr
* out) const = 0; /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) /// \since 0.5.0 @@ -157,10 +159,10 @@ class ARROW_EXPORT Table { /// \param[in] metadata new KeyValueMetadata /// \return new Table std::shared_ptr
ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const; + const std::shared_ptr& metadata) const = 0; /// \return the number of columns in the table - int num_columns() const { return static_cast(columns_.size()); } + int num_columns() const { return schema_->num_fields(); } /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } @@ -174,15 +176,43 @@ class ARROW_EXPORT Table { /// \brief Return true if any column has multiple chunks bool IsChunked() const; + protected: + std::shared_ptr schema_; + int64_t num_rows_; + private: ARROW_DISALLOW_COPY_AND_ASSIGN(Table); +}; - std::shared_ptr schema_; - std::vector> columns_; +/// \class SimpleTable +/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch +class ARROW_EXPORT SimpleTable : public Table { + public: + /// \brief Construct Table from schema and columns + /// If columns is zero-length, the table's number of rows is zero + /// \param schema The table schema (column types) + /// \param columns The table's columns + /// \param num_rows number of rows in table, -1 (default) to infer from columns + SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, + int64_t num_rows = -1); - int64_t num_rows_; + /// \brief Construct Table from schema and arrays + /// \param schema The table schema (column types) + /// \param arrays The table's columns as arrays + /// \param num_rows number of rows in table, -1 (default) to infer from columns + SimpleTable(const std::shared_ptr& schema, + const std::vector>& arrays, + int64_t num_rows = -1); + + std::shared_ptr column(int i) const override; + // { return columns_[i]; } + + private: + std::vector> columns_; }; + /// \brief Compute a sequence of record batches from a (possibly chunked) Table class ARROW_EXPORT TableBatchReader : public RecordBatchReader { public: From 7691a5f1fd74c1c1af3e8d3dd86095f685573dc9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 11:14:10 -0500 Subject: [PATCH 05/14] Make Table virtual also, refactor Change-Id: Ia41fd3d634bb7d0a9090f7758e230b1665df21a3 --- cpp/src/arrow/ipc/feather-test.cc | 4 +- cpp/src/arrow/ipc/ipc-read-write-test.cc | 16 +- cpp/src/arrow/ipc/json-internal.cc | 4 +- cpp/src/arrow/ipc/writer.cc | 9 +- cpp/src/arrow/python/python-test.cc | 4 +- cpp/src/arrow/record_batch.cc | 152 ++++++++------- cpp/src/arrow/record_batch.h | 28 --- cpp/src/arrow/table-test.cc | 140 +++++++------- cpp/src/arrow/table.cc | 232 +++++++++++------------ cpp/src/arrow/table.h | 44 +---- cpp/src/arrow/table_builder-test.cc | 4 +- cpp/src/arrow/test-util.h | 2 +- python/pyarrow/includes/libarrow.pxd | 5 + python/pyarrow/table.pxi | 3 +- 14 files changed, 288 insertions(+), 359 deletions(-) diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index 0b2b9d7869b..e3de17f1f75 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -377,8 +377,8 @@ TEST_F(TestTableWriter, TimeTypes) { schema->field(i)->type(), values->length(), buffers, values->null_count(), 0)); } - SimpleRecordBatch batch(schema, values->length(), std::move(arrays)); - CheckBatch(batch); + auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays)); + CheckBatch(*batch); } TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) { diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index e88de14f515..1fcbdac5ebc 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -197,8 +197,8 @@ class IpcTestFixture : public io::MemoryMapFixture { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - SimpleRecordBatch batch(schema, 0, {array}); - CheckRoundtrip(batch, buffer_size); + auto batch = RecordBatch::Make(schema, 0, {array}); + CheckRoundtrip(*batch, buffer_size); } protected: @@ -292,13 +292,13 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) { auto CheckArray = [this](const std::shared_ptr& array) { auto f0 = field("f0", array->type()); auto schema = ::arrow::schema({f0}); - SimpleRecordBatch batch(schema, array->length(), {array}); - auto sliced_batch = batch.Slice(0, 5); + auto batch = RecordBatch::Make(schema, array->length(), {array}); + auto sliced_batch = batch->Slice(0, 5); int64_t full_size; int64_t sliced_size; - ASSERT_OK(GetRecordBatchSize(batch, &full_size)); + ASSERT_OK(GetRecordBatchSize(*batch, &full_size)); ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size)); ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size; @@ -631,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { std::vector> fields = {f0}; auto schema = std::make_shared(fields); - SimpleRecordBatch batch(schema, length, {array}); + auto batch = RecordBatch::Make(schema, length, {array}); std::string path = "test-write-large-record_batch"; @@ -640,8 +640,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); std::shared_ptr result; - ASSERT_OK(DoLargeRoundTrip(batch, false, &result)); - CheckReadResult(*result, batch); + ASSERT_OK(DoLargeRoundTrip(*batch, false, &result)); + CheckReadResult(*result, *batch); ASSERT_EQ(length, result->num_rows()); } diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index 5b7ed54e73c..bfb3d282d87 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -125,8 +125,8 @@ class SchemaWriter { // Make a dummy record batch. A bit tedious as we have to make a schema auto schema = ::arrow::schema({arrow::field("dictionary", dictionary->type())}); - SimpleRecordBatch batch(schema, dictionary->length(), {dictionary}); - RETURN_NOT_OK(WriteRecordBatch(batch, writer_)); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + RETURN_NOT_OK(WriteRecordBatch(*batch, writer_)); writer_->EndObject(); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 14a1dc5f9d4..3c1db06159e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -509,12 +509,9 @@ class DictionaryWriter : public RecordBatchSerializer { dictionary_id_ = dictionary_id; // Make a dummy record batch. A bit tedious as we have to make a schema - std::vector> fields = { - arrow::field("dictionary", dictionary->type())}; - auto schema = std::make_shared(fields); - SimpleRecordBatch batch(schema, dictionary->length(), {dictionary}); - - return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); + auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())}); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length); } private: diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc index 126bc223feb..3b7d7d884ef 100644 --- a/cpp/src/arrow/python/python-test.cc +++ b/cpp/src/arrow/python/python-test.cc @@ -82,8 +82,8 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) { std::vector> fields = {f1, f2, f3}; std::vector> cols = {arr, arr, arr}; - auto schema = std::make_shared(fields); - auto table = std::make_shared
(schema, cols); + auto schema = ::arrow::schema(fields); + auto table = Table::Make(schema, cols); PyObject* out; Py_BEGIN_ALLOW_THREADS; diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 7eff83f629d..0ef75762c43 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -29,6 +29,81 @@ namespace arrow { +/// \class SimpleRecordBatch +/// \brief A basic, non-lazy in-memory record batch +class SimpleRecordBatch : public RecordBatch { + public: + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + std::vector>&& columns) + : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); + columns_ = std::move(columns); + } + + SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, + const std::vector>& columns) + : RecordBatch(schema, num_rows) { + DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); + columns_ = columns; + } + + std::shared_ptr column(int i) const override { + if (!boxed_columns_[i]) { + boxed_columns_[i] = MakeArray(columns_[i]); + } + DCHECK(boxed_columns_[i]); + return boxed_columns_[i]; + } + + std::shared_ptr column_data(int i) const override { return columns_[i]; } + + std::shared_ptr ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return RecordBatch::Make(new_schema, num_rows_, columns_); + } + + std::shared_ptr Slice(int64_t offset, int64_t length) const override { + std::vector> arrays; + arrays.reserve(num_columns()); + for (const auto& field : columns_) { + int64_t col_length = std::min(field->length - offset, length); + int64_t col_offset = field->offset + offset; + + auto new_data = std::make_shared(*field); + new_data->length = col_length; + new_data->offset = col_offset; + new_data->null_count = kUnknownNullCount; + arrays.emplace_back(new_data); + } + int64_t num_rows = std::min(num_rows_ - offset, length); + return std::make_shared(schema_, num_rows, std::move(arrays)); + } + + private: + std::vector> columns_; +}; + RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) : schema_(schema), num_rows_(num_rows) { boxed_columns_.resize(schema->num_fields()); @@ -115,83 +190,6 @@ Status RecordBatch::Validate() const { return Status::OK(); } -// ---------------------------------------------------------------------- -// In-memory simple record batch implementation - -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, - int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, - int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -SimpleRecordBatch::SimpleRecordBatch(const std::shared_ptr& schema, - int64_t num_rows, - std::vector>&& columns) - : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); - columns_ = std::move(columns); -} - -SimpleRecordBatch::SimpleRecordBatch( - const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns) - : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); - columns_ = columns; -} - -std::shared_ptr SimpleRecordBatch::column(int i) const { - if (!boxed_columns_[i]) { - boxed_columns_[i] = MakeArray(columns_[i]); - } - DCHECK(boxed_columns_[i]); - return boxed_columns_[i]; -} - -std::shared_ptr SimpleRecordBatch::column_data(int i) const { - return columns_[i]; -} - -std::shared_ptr SimpleRecordBatch::ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return RecordBatch::Make(new_schema, num_rows_, columns_); -} - -std::shared_ptr SimpleRecordBatch::Slice(int64_t offset, - int64_t length) const { - std::vector> arrays; - arrays.reserve(num_columns()); - for (const auto& field : columns_) { - int64_t col_length = std::min(field->length - offset, length); - int64_t col_offset = field->offset + offset; - - auto new_data = std::make_shared(*field); - new_data->length = col_length; - new_data->offset = col_offset; - new_data->null_count = kUnknownNullCount; - arrays.emplace_back(new_data); - } - int64_t num_rows = std::min(num_rows_ - offset, length); - return std::make_shared(schema_, num_rows, std::move(arrays)); -} - // ---------------------------------------------------------------------- // Base record batch reader diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 94d6045186d..566a66ab43e 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -136,34 +136,6 @@ class ARROW_EXPORT RecordBatch { ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); }; -/// \class SimpleRecordBatch -/// \brief A basic, non-lazy in-memory record batch -class ARROW_EXPORT SimpleRecordBatch : public RecordBatch { - public: - SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); - - SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, - std::vector>&& columns); - - SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, - const std::vector>& columns); - - std::shared_ptr column(int i) const override; - std::shared_ptr column_data(int i) const override; - - std::shared_ptr Slice(int64_t offset, int64_t length) const override; - - std::shared_ptr ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const override; - - private: - std::vector> columns_; -}; - /// \brief Abstract interface for reading stream of record batches class ARROW_EXPORT RecordBatchReader { public: diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 0264a0f003c..5452a633618 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -217,8 +217,8 @@ class TestTable : public TestBase { TEST_F(TestTable, EmptySchema) { auto empty_schema = ::arrow::schema({}); - table_.reset(new Table(empty_schema, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(empty_schema, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(0, table_->num_rows()); ASSERT_EQ(0, table_->num_columns()); } @@ -227,20 +227,20 @@ TEST_F(TestTable, Ctors) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); - auto array_ctor = std::make_shared
(schema_, arrays_); + auto array_ctor = Table::Make(schema_, arrays_); ASSERT_TRUE(table_->Equals(*array_ctor)); - table_.reset(new Table(schema_, columns_, length)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_OK(MakeTable(schema_, arrays_, &table_)); - ASSERT_OK(table_->ValidateColumns()); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); } @@ -249,7 +249,7 @@ TEST_F(TestTable, Metadata) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->schema()->Equals(*schema_)); @@ -263,14 +263,14 @@ TEST_F(TestTable, InvalidColumns) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_, length - 1)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length - 1); + ASSERT_RAISES(Invalid, table_->Validate()); columns_.clear(); // Wrong number of columns - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); columns_ = { std::make_shared(schema_->field(0), MakeRandomArray(length)), @@ -278,15 +278,15 @@ TEST_F(TestTable, InvalidColumns) { std::make_shared(schema_->field(2), MakeRandomArray(length - 1))}; - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); } TEST_F(TestTable, Equals) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->Equals(*table_)); // Differing schema @@ -295,7 +295,8 @@ TEST_F(TestTable, Equals) { auto f2 = field("f5", int16()); vector> fields = {f0, f1, f2}; auto other_schema = std::make_shared(fields); - ASSERT_FALSE(table_->Equals(Table(other_schema, columns_))); + auto other = Table::Make(other_schema, columns_); + ASSERT_FALSE(table_->Equals(*other)); // Differing columns std::vector> other_columns = { std::make_shared(schema_->field(0), @@ -304,7 +305,9 @@ TEST_F(TestTable, Equals) { MakeRandomArray(length, 10)), std::make_shared(schema_->field(2), MakeRandomArray(length, 10))}; - ASSERT_FALSE(table_->Equals(Table(schema_, other_columns))); + + other = Table::Make(schema_, other_columns); + ASSERT_FALSE(table_->Equals(*other)); } TEST_F(TestTable, FromRecordBatches) { @@ -316,7 +319,7 @@ TEST_F(TestTable, FromRecordBatches) { std::shared_ptr
result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &result)); - expected = std::make_shared
(schema_, columns_); + expected = Table::Make(schema_, columns_); ASSERT_TRUE(result->Equals(*expected)); std::vector> other_columns; @@ -326,15 +329,14 @@ TEST_F(TestTable, FromRecordBatches) { } ASSERT_OK(Table::FromRecordBatches({batch1, batch1}, &result)); - expected = std::make_shared
(schema_, other_columns); + expected = Table::Make(schema_, other_columns); ASSERT_TRUE(result->Equals(*expected)); // Error states std::vector> empty_batches; ASSERT_RAISES(Invalid, Table::FromRecordBatches(empty_batches, &result)); - std::vector> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector> other_arrays = {arrays_[0], arrays_[1]}; auto batch2 = RecordBatch::Make(other_schema, length, other_arrays); @@ -363,8 +365,7 @@ TEST_F(TestTable, ConcatenateTables) { std::vector> empty_tables; ASSERT_RAISES(Invalid, ConcatenateTables(empty_tables, &result)); - std::vector> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector> other_arrays = {arrays_[0], arrays_[1]}; auto batch3 = RecordBatch::Make(other_schema, length, other_arrays); @@ -377,31 +378,38 @@ TEST_F(TestTable, RemoveColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr
result; ASSERT_OK(table.RemoveColumn(0, &result)); auto ex_schema = ::arrow::schema({schema_->field(1), schema_->field(2)}); std::vector> ex_columns = {table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(1, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(2)}); ex_columns = {table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(2, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); ex_columns = {table.column(0), table.column(1)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); } TEST_F(TestTable, AddColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr
result; // Some negative tests with invalid index @@ -420,50 +428,32 @@ TEST_F(TestTable, AddColumn) { ASSERT_OK(table.AddColumn(0, columns_[0], &result)); auto ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - std::vector> ex_columns = {table.column(0), table.column(0), - table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(1, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - ex_columns = {table.column(0), table.column(0), table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(2, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(0), schema_->field(2)}); - ex_columns = {table.column(0), table.column(1), table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(0), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(3, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(2), schema_->field(0)}); - ex_columns = {table.column(0), table.column(1), table.column(2), table.column(0)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); -} - -TEST_F(TestTable, IsChunked) { - ArrayVector c1, c2; - - auto a1 = MakeRandomArray(10); - auto a2 = MakeRandomArray(20); - - auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())}); - - std::vector> columns; - - std::shared_ptr batch; - - columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})}; - auto t1 = std::make_shared
(sch1, columns); - - ASSERT_FALSE(t1->IsChunked()); - - columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})}; - auto t2 = std::make_shared
(sch1, columns); - - ASSERT_TRUE(t2->IsChunked()); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(2), table.column(0)}); + ASSERT_TRUE(result->Equals(*expected)); } class TestRecordBatch : public TestBase {}; @@ -483,13 +473,13 @@ TEST_F(TestRecordBatch, Equals) { auto a1 = MakeRandomArray(length); auto a2 = MakeRandomArray(length); - SimpleRecordBatch b1(schema, length, {a0, a1, a2}); - SimpleRecordBatch b3(schema2, length, {a0, a1}); - SimpleRecordBatch b4(schema, length, {a0, a1, a1}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); + auto b3 = RecordBatch::Make(schema2, length, {a0, a1}); + auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1}); - ASSERT_TRUE(b1.Equals(b1)); - ASSERT_FALSE(b1.Equals(b3)); - ASSERT_FALSE(b1.Equals(b4)); + ASSERT_TRUE(b1->Equals(*b1)); + ASSERT_FALSE(b1->Equals(*b3)); + ASSERT_FALSE(b1->Equals(*b4)); } #ifdef NDEBUG @@ -509,16 +499,16 @@ TEST_F(TestRecordBatch, Validate) { auto a2 = MakeRandomArray(length); auto a3 = MakeRandomArray(5); - SimpleRecordBatch b1(schema, length, {a0, a1, a2}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); ASSERT_OK(b1.Validate()); // Length mismatch - SimpleRecordBatch b2(schema, length, {a0, a1, a3}); + auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3}); ASSERT_RAISES(Invalid, b2.Validate()); // Type mismatch - SimpleRecordBatch b3(schema, length, {a0, a1, a0}); + auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0}); ASSERT_RAISES(Invalid, b3.Validate()); } @@ -531,7 +521,7 @@ TEST_F(TestRecordBatch, Slice) { auto f1 = field("f1", uint8()); vector> fields = {f0, f1}; - auto schema = std::make_shared(fields); + auto schema = ::arrow::schema(fields); auto a0 = MakeRandomArray(length); auto a1 = MakeRandomArray(length); @@ -569,9 +559,9 @@ TEST_F(TestTableBatchReader, ReadNext) { std::shared_ptr batch; columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2, a2})}; - Table t1(sch1, columns); + auto t1 = Table::Make(sch1, columns); - TableBatchReader i1(t1); + TableBatchReader i1(*t1); ASSERT_OK(i1.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); @@ -586,9 +576,9 @@ TEST_F(TestTableBatchReader, ReadNext) { ASSERT_EQ(nullptr, batch); columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})}; - Table t2(sch1, columns); + auto t2 = Table::Make(sch1, columns); - TableBatchReader i2(t2); + TableBatchReader i2(*t2); ASSERT_OK(i2.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 143daa86259..9279b854e27 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -156,6 +156,114 @@ Status Column::ValidateData() { // ---------------------------------------------------------------------- // Table methods +/// \class SimpleTable +/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch +class SimpleTable : public Table { + public: + SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, int64_t num_rows = -1) + : columns_(columns) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } + } + + SimpleTable(const std::shared_ptr& schema, + const std::vector>& columns, int64_t num_rows = -1) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } + + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = + std::make_shared(schema->field(static_cast(i)), columns[i]); + } + } + + std::shared_ptr column(int i) const override { return columns_[i]; } + + Status RemoveColumn(int i, std::shared_ptr
* out) const override { + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); + + *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i)); + return Status::OK(); + } + + Status AddColumn(int i, const std::shared_ptr& col, + std::shared_ptr
* out) const override { + if (i < 0 || i > num_columns() + 1) { + return Status::Invalid("Invalid column index."); + } + if (col == nullptr) { + std::stringstream ss; + ss << "Column " << i << " was null"; + return Status::Invalid(ss.str()); + } + if (col->length() != num_rows_) { + std::stringstream ss; + ss << "Added column's length must match table's length. Expected length " + << num_rows_ << " but got length " << col->length(); + return Status::Invalid(ss.str()); + } + + std::shared_ptr new_schema; + RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); + + *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); + return Status::OK(); + } + + std::shared_ptr
ReplaceSchemaMetadata( + const std::shared_ptr& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return Table::Make(new_schema, columns_); + } + + Status Validate() const override { + if (static_cast(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); + } + + // Make sure columns are all the same length + for (int i = 0; i < num_columns(); ++i) { + const Column* col = columns_[i].get(); + if (col == nullptr) { + std::stringstream ss; + ss << "Column " << i << " was null"; + return Status::Invalid(ss.str()); + } + if (col->length() != num_rows_) { + std::stringstream ss; + ss << "Column " << i << " named " << col->name() << " expected length " + << num_rows_ << " but got length " << col->length(); + return Status::Invalid(ss.str()); + } + } + return Status::OK(); + } + + private: + std::vector> columns_; +}; + +Table::Table() {} + std::shared_ptr
Table::Make(const std::shared_ptr& schema, const std::vector>& columns, int64_t num_rows) { @@ -168,12 +276,6 @@ std::shared_ptr
Table::Make(const std::shared_ptr& schema, return std::make_shared(schema, arrays, num_rows); } -std::shared_ptr
Table::ReplaceSchemaMetadata( - const std::shared_ptr& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared
(new_schema, columns_); -} - Status Table::FromRecordBatches(const std::vector>& batches, std::shared_ptr
* table) { if (batches.size() == 0) { @@ -205,7 +307,7 @@ Status Table::FromRecordBatches(const std::vector>& columns[i] = std::make_shared(schema->field(i), column_arrays); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -242,7 +344,7 @@ Status ConcatenateTables(const std::vector>& tables, } columns[i] = std::make_shared(schema->field(i), column_arrays); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -253,124 +355,18 @@ bool Table::Equals(const Table& other) const { if (!schema_->Equals(*other.schema())) { return false; } - if (static_cast(columns_.size()) != other.num_columns()) { + if (this->num_columns() != other.num_columns()) { return false; } - for (int i = 0; i < static_cast(columns_.size()); i++) { - if (!columns_[i]->Equals(other.column(i))) { + for (int i = 0; i < this->num_columns(); i++) { + if (!this->column(i)->Equals(other.column(i))) { return false; } } return true; } -Status Table::ValidateColumns() const { - if (num_columns() != schema_->num_fields()) { - return Status::Invalid("Number of columns did not match schema"); - } - - // Make sure columns are all the same length - for (size_t i = 0; i < columns_.size(); ++i) { - const Column* col = columns_[i].get(); - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Column " << i << " named " << col->name() << " expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - } - return Status::OK(); -} - -bool Table::IsChunked() const { - for (size_t i = 0; i < columns_.size(); ++i) { - if (columns_[i]->data()->num_chunks() > 1) { - return true; - } - } - return false; -} - - -SimpleTable::SimpleTable(const std::shared_ptr& schema, - const std::vector>& columns, - int64_t num_rows) - : schema_(schema), columns_(columns) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); - } - } else { - num_rows_ = num_rows; - } -} - -SimpleTable::SimpleTable(const std::shared_ptr& schema, - const std::vector>& columns, - int64_t num_rows) - : schema_(schema) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); - } - } else { - num_rows_ = num_rows; - } - - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = - std::make_shared(schema->field(static_cast(i)), columns[i]); - } -} - -std::shared_ptr SimpleTable::column(int i) const { - return columns_[i]; -} - - -Status SimpleTable::RemoveColumn(int i, std::shared_ptr
* out) const { - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); - - *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i)); - return Status::OK(); -} - -Status SimpleTable::AddColumn(int i, const std::shared_ptr& col, - std::shared_ptr
* out) const { - if (i < 0 || i > num_columns() + 1) { - return Status::Invalid("Invalid column index."); - } - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Added column's length must match table's length. Expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - - std::shared_ptr new_schema; - RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); - - *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); - return Status::OK(); -} - Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, std::shared_ptr
* table) { @@ -388,7 +384,7 @@ Status MakeTable(const std::shared_ptr& schema, columns.emplace_back(std::make_shared(schema->field(i), arrays[i])); } - *table = std::make_shared
(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 1c387a46249..8e4b8ff8953 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -116,6 +116,8 @@ class ARROW_EXPORT Column { /// \brief Logical table as sequence of chunked arrays class ARROW_EXPORT Table { public: + virtual ~Table() = default; + /// \brief Construct Table from schema and columns /// If columns is zero-length, the table's number of rows is zero /// \param schema The table schema (column types) @@ -158,9 +160,12 @@ class ARROW_EXPORT Table { /// /// \param[in] metadata new KeyValueMetadata /// \return new Table - std::shared_ptr
ReplaceSchemaMetadata( + virtual std::shared_ptr
ReplaceSchemaMetadata( const std::shared_ptr& metadata) const = 0; + /// \brief Perform any checks to validate the input arguments + virtual Status Validate() const = 0; + /// \return the number of columns in the table int num_columns() const { return schema_->num_fields(); } @@ -170,13 +175,9 @@ class ARROW_EXPORT Table { /// \brief Determine if semantic contents of tables are exactly equal bool Equals(const Table& other) const; - /// \brief Perform any checks to validate the input arguments - Status ValidateColumns() const; - - /// \brief Return true if any column has multiple chunks - bool IsChunked() const; - protected: + Table(); + std::shared_ptr schema_; int64_t num_rows_; @@ -184,35 +185,6 @@ class ARROW_EXPORT Table { ARROW_DISALLOW_COPY_AND_ASSIGN(Table); }; -/// \class SimpleTable -/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch -class ARROW_EXPORT SimpleTable : public Table { - public: - /// \brief Construct Table from schema and columns - /// If columns is zero-length, the table's number of rows is zero - /// \param schema The table schema (column types) - /// \param columns The table's columns - /// \param num_rows number of rows in table, -1 (default) to infer from columns - SimpleTable(const std::shared_ptr& schema, - const std::vector>& columns, - int64_t num_rows = -1); - - /// \brief Construct Table from schema and arrays - /// \param schema The table schema (column types) - /// \param arrays The table's columns as arrays - /// \param num_rows number of rows in table, -1 (default) to infer from columns - SimpleTable(const std::shared_ptr& schema, - const std::vector>& arrays, - int64_t num_rows = -1); - - std::shared_ptr column(int i) const override; - // { return columns_[i]; } - - private: - std::vector> columns_; -}; - - /// \brief Compute a sequence of record batches from a (possibly chunked) Table class ARROW_EXPORT TableBatchReader : public RecordBatchReader { public: diff --git a/cpp/src/arrow/table_builder-test.cc b/cpp/src/arrow/table_builder-test.cc index a3306c5b566..8167577e906 100644 --- a/cpp/src/arrow/table_builder-test.cc +++ b/cpp/src/arrow/table_builder-test.cc @@ -99,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(ex_b1.Finish(&a1)); ASSERT_OK(ex_b2.Finish(&a2)); - SimpleRecordBatch expected(schema, 4, {a0, a1, a2}); + auto expected = RecordBatch::Make(schema, 4, {a0, a1, a2}); // Builder attributes ASSERT_EQ(3, builder->num_fields()); @@ -120,7 +120,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(builder->Flush(&batch)); } - ASSERT_BATCHES_EQUAL(expected, *batch); + ASSERT_BATCHES_EQUAL(*expected, *batch); } // Test setting initial capacity diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 7560df2b04b..1a34808488a 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -374,7 +374,7 @@ void AssertArraysEqual(const Array& expected, const Array& actual) { #define ASSERT_BATCHES_EQUAL(LEFT, RIGHT) \ do { \ - if (!LEFT.ApproxEquals(RIGHT)) { \ + if (!(LEFT).ApproxEquals(RIGHT)) { \ std::stringstream ss; \ ss << "Left:\n"; \ ASSERT_OK(PrettyPrint(LEFT, 0, &ss)); \ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 7c1d8aa71a4..73e34c7b210 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -429,6 +429,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CTable(const shared_ptr[CSchema]& schema, const vector[shared_ptr[CColumn]]& columns) + @staticmethod + shared_ptr[CTable] Make( + const shared_ptr[CSchema]& schema, + const vector[shared_ptr[CColumn]]& columns) + @staticmethod CStatus FromRecordBatches( const vector[shared_ptr[CRecordBatch]]& batches, diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 01d880ed4d2..8c5b8bbc343 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -945,8 +945,7 @@ cdef class Table: else: raise ValueError(type(arrays[i])) - table.reset(new CTable(c_schema, columns)) - return pyarrow_wrap_table(table) + return pyarrow_wrap_table(CTable.Make(c_schema, columns)) @staticmethod def from_batches(batches): From 9a61beb5cf77b8d21844b5a88c995757b8c0cfd8 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 13:20:32 -0500 Subject: [PATCH 06/14] Move boxed_columns_ to SimpleTable. Remove DecimalType backwards compat define Change-Id: I81e9f9a11cd0c43e0d629e404643f95b16a1f75a --- cpp/src/arrow/record_batch.cc | 11 ++++++++--- cpp/src/arrow/record_batch.h | 3 --- cpp/src/arrow/type.h | 11 ++++------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 0ef75762c43..79e6c33583b 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -38,6 +38,7 @@ class SimpleRecordBatch : public RecordBatch { : RecordBatch(schema, num_rows) { DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } @@ -48,6 +49,7 @@ class SimpleRecordBatch : public RecordBatch { : RecordBatch(schema, num_rows) { DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } @@ -58,6 +60,7 @@ class SimpleRecordBatch : public RecordBatch { : RecordBatch(schema, num_rows) { DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = std::move(columns); + boxed_columns_.resize(schema->num_fields()); } SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, @@ -65,6 +68,7 @@ class SimpleRecordBatch : public RecordBatch { : RecordBatch(schema, num_rows) { DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = columns; + boxed_columns_.resize(schema->num_fields()); } std::shared_ptr column(int i) const override { @@ -102,12 +106,13 @@ class SimpleRecordBatch : public RecordBatch { private: std::vector> columns_; + + // Caching boxed array data + mutable std::vector> boxed_columns_; }; RecordBatch::RecordBatch(const std::shared_ptr& schema, int64_t num_rows) - : schema_(schema), num_rows_(num_rows) { - boxed_columns_.resize(schema->num_fields()); -} + : schema_(schema), num_rows_(num_rows) {} std::shared_ptr RecordBatch::Make( const std::shared_ptr& schema, int64_t num_rows, diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 566a66ab43e..61b15c1143d 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -129,9 +129,6 @@ class ARROW_EXPORT RecordBatch { std::shared_ptr schema_; int64_t num_rows_; - // Caching boxed array data - mutable std::vector> boxed_columns_; - private: ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); }; diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 70f275c0fa4..8dcc1592da0 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -498,9 +498,9 @@ class ARROW_EXPORT StructType : public NestedType { std::vector GetBufferLayout() const override; }; -class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { +class ARROW_EXPORT DecimalType : public FixedSizeBinaryType { public: - explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t scale) + explicit DecimalType(int32_t byte_width, int32_t precision, int32_t scale) : FixedSizeBinaryType(byte_width, Type::DECIMAL), precision_(precision), scale_(scale) {} @@ -513,21 +513,18 @@ class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { int32_t scale_; }; -class ARROW_EXPORT Decimal128Type : public DecimalBaseType { +class ARROW_EXPORT Decimal128Type : public DecimalType { public: static constexpr Type::type type_id = Type::DECIMAL; explicit Decimal128Type(int32_t precision, int32_t scale) - : DecimalBaseType(16, precision, scale) {} + : DecimalType(16, precision, scale) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; std::string name() const override { return "decimal"; } }; -// TODO(wesm): Remove this -using DecimalType = Decimal128Type; - struct UnionMode { enum type { SPARSE, DENSE }; }; From a4e403c361941e2e78392400c516692d8b9a5cb7 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 13:28:53 -0500 Subject: [PATCH 07/14] Set build warning level via environment variable Change-Id: I996c45e66a4b835b2c46003a6bed84da7d446eee --- .travis.yml | 4 +++- ci/travis_before_script_cpp.sh | 4 ++-- ci/travis_env_common.sh | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6333ad95182..3effd292038 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,7 +55,7 @@ matrix: - export ARROW_TRAVIS_VALGRIND=1 - export ARROW_TRAVIS_PLASMA=1 - export ARROW_TRAVIS_CLANG_FORMAT=1 - - export ARROW_BUILD_TYPE=release + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - export CC="clang-4.0" - export CXX="clang++-4.0" - $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh @@ -75,6 +75,7 @@ matrix: before_script: - export ARROW_TRAVIS_USE_TOOLCHAIN=1 - export ARROW_TRAVIS_PLASMA=1 + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh @@ -110,6 +111,7 @@ matrix: - export CC="clang-4.0" - export CXX="clang++-4.0" - nvm install node + - export ARROW_BUILD_TYPE=release - $TRAVIS_BUILD_DIR/ci/travis_lint.sh - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index b90a6dd8784..664f7ce5fed 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -92,13 +92,13 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then cmake $CMAKE_COMMON_FLAGS \ $CMAKE_LINUX_FLAGS \ -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ $CMAKE_OSX_FLAGS \ -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR fi diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh index 2ea819ce377..21b6e266ea6 100755 --- a/ci/travis_env_common.sh +++ b/ci/travis_env_common.sh @@ -39,6 +39,7 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env export CMAKE_EXPORT_COMPILE_COMMANDS=1 export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug} +export ARROW_BUILD_WARNING_LEVEL=${ARROW_BUILD_WARNING_LEVEL:=Production} if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then # C++ toolchain From 498bb08268dbafb6f86f0f8161637b14f5ffabf3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 13:36:54 -0500 Subject: [PATCH 08/14] Fix glib compilation Change-Id: I87885c88e09617dceee6be3b801bdb5442081955 --- c_glib/arrow-glib/record-batch.cpp | 5 ++--- c_glib/arrow-glib/table.cpp | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp index f381af0a2c2..f23a0cf7582 100644 --- a/c_glib/arrow-glib/record-batch.cpp +++ b/c_glib/arrow-glib/record-batch.cpp @@ -150,9 +150,8 @@ garrow_record_batch_new(GArrowSchema *schema, } auto arrow_record_batch = - std::make_shared(garrow_schema_get_raw(schema), - n_rows, - arrow_columns); + arrow::RecordBatch::Make(garrow_schema_get_raw(schema), + n_rows, arrow_columns); return garrow_record_batch_new_raw(&arrow_record_batch); } diff --git a/c_glib/arrow-glib/table.cpp b/c_glib/arrow-glib/table.cpp index 779f2ef62b8..e086396f8f9 100644 --- a/c_glib/arrow-glib/table.cpp +++ b/c_glib/arrow-glib/table.cpp @@ -143,8 +143,7 @@ garrow_table_new(GArrowSchema *schema, } auto arrow_table = - std::make_shared(garrow_schema_get_raw(schema), - arrow_columns); + arrow::Table::Make(garrow_schema_get_raw(schema), arrow_columns); return garrow_table_new_raw(&arrow_table); } From 06bced1ab773e1ebb5778f4e6133b659877d6b18 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 14:21:27 -0500 Subject: [PATCH 09/14] Deprecate arrow::MakeTable Change-Id: If3a168a02443007946dd80f47a4aaa4f9d60b132 --- cpp/src/arrow/table-test.cc | 2 +- cpp/src/arrow/table.cc | 4 ++++ cpp/src/arrow/table.h | 6 ++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 5452a633618..c909958433f 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -239,7 +239,7 @@ TEST_F(TestTable, Ctors) { ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); - ASSERT_OK(MakeTable(schema_, arrays_, &table_)); + table_ = Take::Make(schema_, arrays_); ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 9279b854e27..a3189c23874 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -367,6 +367,8 @@ bool Table::Equals(const Table& other) const { return true; } +#ifndef ARROW_NO_DEPRECATED_API + Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, std::shared_ptr
* table) { @@ -389,6 +391,8 @@ Status MakeTable(const std::shared_ptr& schema, return Status::OK(); } +#endif // ARROW_NO_DEPRECATED_API + // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 8e4b8ff8953..b1778f1d006 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -208,13 +208,19 @@ ARROW_EXPORT Status ConcatenateTables(const std::vector>& tables, std::shared_ptr
* table); + +#ifndef ARROW_NO_DEPRECATED_API + /// \brief Construct table from multiple input tables. /// \return Status, fails if any schemas are different +/// \note Deprecated since 0.8.0 ARROW_EXPORT Status MakeTable(const std::shared_ptr& schema, const std::vector>& arrays, std::shared_ptr
* table); +#endif + } // namespace arrow #endif // ARROW_TABLE_H From 6b02e4386afc3ca4d472489ca800a1afb9b05372 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 20 Nov 2017 16:00:38 -0500 Subject: [PATCH 10/14] Move DCHECK in SimpleRecordBatch ctor to Validate Change-Id: I36b935cc45ec2280ab221be14dccfda44f9ccab6 --- cpp/src/arrow/record_batch.cc | 11 +++++++---- cpp/src/arrow/record_batch.h | 2 +- cpp/src/arrow/table-test.cc | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 79e6c33583b..60932bdf3e4 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -36,7 +36,6 @@ class SimpleRecordBatch : public RecordBatch { SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, const std::vector>& columns) : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); boxed_columns_.resize(schema->num_fields()); for (size_t i = 0; i < columns.size(); ++i) { @@ -47,7 +46,6 @@ class SimpleRecordBatch : public RecordBatch { SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, std::vector>&& columns) : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_.resize(columns.size()); boxed_columns_.resize(schema->num_fields()); for (size_t i = 0; i < columns.size(); ++i) { @@ -58,7 +56,6 @@ class SimpleRecordBatch : public RecordBatch { SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, std::vector>&& columns) : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = std::move(columns); boxed_columns_.resize(schema->num_fields()); } @@ -66,7 +63,6 @@ class SimpleRecordBatch : public RecordBatch { SimpleRecordBatch(const std::shared_ptr& schema, int64_t num_rows, const std::vector>& columns) : RecordBatch(schema, num_rows) { - DCHECK_EQ(static_cast(columns.size()), schema->num_fields()); columns_ = columns; boxed_columns_.resize(schema->num_fields()); } @@ -104,6 +100,13 @@ class SimpleRecordBatch : public RecordBatch { return std::make_shared(schema_, num_rows, std::move(arrays)); } + Status Validate() const override { + if (static_cast(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); + } + return RecordBatch::Validate(); + } + private: std::vector> columns_; diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 61b15c1143d..b2c4c76b3f2 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -121,7 +121,7 @@ class ARROW_EXPORT RecordBatch { /// \brief Check for schema or length inconsistencies /// \return Status - Status Validate() const; + virtual Status Validate() const; protected: RecordBatch(const std::shared_ptr& schema, int64_t num_rows); diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index c909958433f..6dac63ebd2d 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -239,7 +239,7 @@ TEST_F(TestTable, Ctors) { ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); - table_ = Take::Make(schema_, arrays_); + table_ = Table::Make(schema_, arrays_); ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); From c9a7cb83856872a56dd2dd7e8a5405041cdc0a17 Mon Sep 17 00:00:00 2001 From: Kouhei Sutou Date: Tue, 21 Nov 2017 10:01:49 -0500 Subject: [PATCH 11/14] Apply patch to fix Glib test suite Change-Id: I23c701d8246faf8669d70a1bf6ce5ce0bc170591 --- c_glib/test/test-file-writer.rb | 16 ++++++++++++---- c_glib/test/test-gio-input-stream.rb | 18 ++++++++++++++---- c_glib/test/test-gio-output-stream.rb | 18 ++++++++++++++---- c_glib/test/test-stream-writer.rb | 18 +++++++++++------- 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb index 3de8e5cf34b..67aed85f73b 100644 --- a/c_glib/test/test-file-writer.rb +++ b/c_glib/test/test-file-writer.rb @@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-file-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +42,12 @@ def test_write_record_batch input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb index a71a370430e..2adf25b3af5 100644 --- a/c_glib/test/test-gio-input-stream.rb +++ b/c_glib/test/test-gio-input-stream.rb @@ -16,15 +16,21 @@ # under the License. class TestGIOInputStream < Test::Unit::TestCase + include Helper::Buildable + def test_reader_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-input-stream") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ def test_reader_backend input = Arrow::GIOInputStream.new(input_stream) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb index adaa8c1b7b2..c77598ed110 100644 --- a/c_glib/test/test-gio-output-stream.rb +++ b/c_glib/test/test-gio-output-stream.rb @@ -16,17 +16,23 @@ # under the License. class TestGIOOutputStream < Test::Unit::TestCase + include Helper::Buildable + def test_writer_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-output-stream") file = Gio::File.new_for_path(tempfile.path) output_stream = file.append_to(:none) output = Arrow::GIOOutputStream.new(output_stream) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ def test_writer_backend input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb index c3d0e1490ce..32754e20838 100644 --- a/c_glib/test/test-stream-writer.rb +++ b/c_glib/test/test-stream-writer.rb @@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-stream-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema) begin columns = [ - build_boolean_array([true]), + build_boolean_array(data), ] - record_batch = Arrow::RecordBatch.new(schema, 1, columns) + record_batch = Arrow::RecordBatch.new(schema, data.size, columns) stream_writer.write_record_batch(record_batch) ensure stream_writer.close @@ -41,10 +43,12 @@ def test_write_record_batch input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin stream_reader = Arrow::RecordBatchStreamReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], stream_reader.schema.fields.collect(&:name)) - assert_equal(true, - stream_reader.read_next.get_column(0).get_value(0)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + stream_reader.read_next) assert_nil(stream_reader.read_next) ensure input.close From 06625301175f6ba1675e21f9071f4e039d3162fb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 21 Nov 2017 16:09:11 -0500 Subject: [PATCH 12/14] Fix test case that wasn't being run in debug builds Change-Id: Ie205d1fb8c3e05a2b01ecf4c317cfdc1d3acbd24 --- cpp/src/arrow/table-test.cc | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 6dac63ebd2d..e77d3aa8bbc 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -482,9 +482,6 @@ TEST_F(TestRecordBatch, Equals) { ASSERT_FALSE(b1->Equals(*b4)); } -#ifdef NDEBUG -// In debug builds, RecordBatch ctor aborts if you construct an invalid one - TEST_F(TestRecordBatch, Validate) { const int length = 10; @@ -501,19 +498,17 @@ TEST_F(TestRecordBatch, Validate) { auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); - ASSERT_OK(b1.Validate()); + ASSERT_OK(b1->Validate()); // Length mismatch auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3}); - ASSERT_RAISES(Invalid, b2.Validate()); + ASSERT_RAISES(Invalid, b2->Validate()); // Type mismatch auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0}); - ASSERT_RAISES(Invalid, b3.Validate()); + ASSERT_RAISES(Invalid, b3->Validate()); } -#endif - TEST_F(TestRecordBatch, Slice) { const int length = 10; From 5792370854ec13c422e3f877c03ea31924a29041 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 21 Nov 2017 16:36:34 -0500 Subject: [PATCH 13/14] Revert to debug build for now Change-Id: I345dfe5aaf7e5adb49ceccf5663d950c21c03558 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3effd292038..ddadf739aab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -111,7 +111,6 @@ matrix: - export CC="clang-4.0" - export CXX="clang++-4.0" - nvm install node - - export ARROW_BUILD_TYPE=release - $TRAVIS_BUILD_DIR/ci/travis_lint.sh - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh From 55ce663d57a596fea52dd9abe6dec3fb62853e38 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 21 Nov 2017 18:18:41 -0500 Subject: [PATCH 14/14] clang-format Change-Id: I7511a0b63c3f540d4dc688eef5a86f80f09228d0 --- cpp/src/arrow/table.cc | 2 +- cpp/src/arrow/table.h | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index a3189c23874..8f3f195765a 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -391,7 +391,7 @@ Status MakeTable(const std::shared_ptr& schema, return Status::OK(); } -#endif // ARROW_NO_DEPRECATED_API +#endif // ARROW_NO_DEPRECATED_API // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index b1778f1d006..d0312d93cb9 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -208,7 +208,6 @@ ARROW_EXPORT Status ConcatenateTables(const std::vector>& tables, std::shared_ptr
* table); - #ifndef ARROW_NO_DEPRECATED_API /// \brief Construct table from multiple input tables.