From 8d909fdcda14023019b37e9132bf2f928f727caa Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 7 Nov 2019 11:54:20 +0100 Subject: [PATCH] ARROW-3408: [C++] Add CSV option to automatically attempt dict encoding This is tied to type inference, and only triggers on string or binary columns. Each chunk is dict-encoded up to a certain cardinality, after which the whole column falls back on plain encoding. --- cpp/src/arrow/array/builder_dict.h | 3 + cpp/src/arrow/buffer.h | 2 +- cpp/src/arrow/csv/column_builder.cc | 96 +++++--- cpp/src/arrow/csv/column_builder_test.cc | 283 ++++++++++------------- cpp/src/arrow/csv/converter.cc | 166 ++++++++++--- cpp/src/arrow/csv/converter.h | 28 ++- cpp/src/arrow/csv/converter_benchmark.cc | 3 +- cpp/src/arrow/csv/converter_test.cc | 114 ++++++++- cpp/src/arrow/csv/options.h | 9 + cpp/src/arrow/gpu/cuda_memory.cc | 1 + cpp/src/arrow/gpu/cuda_memory.h | 2 +- cpp/src/arrow/io/buffered_test.cc | 1 + cpp/src/arrow/io/memory.cc | 1 + cpp/src/arrow/io/memory.h | 5 +- cpp/src/arrow/memory_pool.h | 8 +- cpp/src/arrow/type_fwd.h | 9 +- python/pyarrow/_csv.pyx | 43 +++- python/pyarrow/includes/libarrow.pxd | 4 + python/pyarrow/tests/test_csv.py | 186 ++++++++------- 19 files changed, 622 insertions(+), 342 deletions(-) diff --git a/cpp/src/arrow/array/builder_dict.h b/cpp/src/arrow/array/builder_dict.h index 8851dbca233..fd5f16d0c69 100644 --- a/cpp/src/arrow/array/builder_dict.h +++ b/cpp/src/arrow/array/builder_dict.h @@ -138,6 +138,9 @@ class DictionaryBuilderBase : public ArrayBuilder { ~DictionaryBuilderBase() override = default; + /// \brief The current number of entries in the dictionary + int64_t dictionary_length() const { return memo_table_->size(); } + /// \brief Append a scalar value Status Append(const Scalar& value) { ARROW_RETURN_NOT_OK(Reserve(1)); diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 3eb9b033b92..83033844026 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -25,8 +25,8 @@ #include #include -#include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/string_view.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/csv/column_builder.cc b/cpp/src/arrow/csv/column_builder.cc index da656020773..401b3156bc4 100644 --- a/cpp/src/arrow/csv/column_builder.cc +++ b/cpp/src/arrow/csv/column_builder.cc @@ -162,7 +162,8 @@ class TypedColumnBuilder : public ColumnBuilder { }; Status TypedColumnBuilder::Init() { - return Converter::Make(type_, options_, pool_, &converter_); + ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, options_, pool_)); + return Status::OK(); } void TypedColumnBuilder::Insert(int64_t block_index, @@ -224,7 +225,7 @@ class InferringColumnBuilder : public ColumnBuilder { Status Finish(std::shared_ptr* out) override; protected: - Status LoosenType(); + Status LoosenType(const Status& conversion_error); Status UpdateType(); Status TryConvertChunk(size_t chunk_index); // This must be called unlocked! @@ -240,7 +241,17 @@ class InferringColumnBuilder : public ColumnBuilder { std::shared_ptr converter_; // Current inference status - enum class InferKind { Null, Integer, Boolean, Real, Timestamp, Text, Binary }; + enum class InferKind { + Null, + Integer, + Boolean, + Real, + Timestamp, + TextDict, + BinaryDict, + Text, + Binary + }; std::shared_ptr infer_type_; InferKind infer_kind_; @@ -255,9 +266,10 @@ Status InferringColumnBuilder::Init() { return UpdateType(); } -Status InferringColumnBuilder::LoosenType() { +Status InferringColumnBuilder::LoosenType(const Status& conversion_error) { // We are locked + DCHECK(!conversion_error.ok()); DCHECK(can_loosen_type_); switch (infer_kind_) { case InferKind::Null: @@ -273,7 +285,23 @@ Status InferringColumnBuilder::LoosenType() { infer_kind_ = InferKind::Real; break; case InferKind::Real: - infer_kind_ = InferKind::Text; + if (options_.auto_dict_encode) { + infer_kind_ = InferKind::TextDict; + } else { + infer_kind_ = InferKind::Text; + } + break; + case InferKind::TextDict: + if (conversion_error.IsIndexError()) { + // Cardinality too large, fall back to non-dict encoding + infer_kind_ = InferKind::Text; + } else { + // Assuming UTF8 validation failure + infer_kind_ = InferKind::BinaryDict; + } + break; + case InferKind::BinaryDict: + infer_kind_ = InferKind::Binary; break; case InferKind::Text: infer_kind_ = InferKind::Binary; @@ -287,38 +315,46 @@ Status InferringColumnBuilder::LoosenType() { Status InferringColumnBuilder::UpdateType() { // We are locked + auto make_converter = [&](std::shared_ptr type) -> Status { + infer_type_ = type; + ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type, options_, pool_)); + return Status::OK(); + }; + + auto make_dict_converter = [&](std::shared_ptr type) -> Status { + infer_type_ = dictionary(int32(), type); + ARROW_ASSIGN_OR_RAISE(auto dict_converter, + DictionaryConverter::Make(type, options_, pool_)); + dict_converter->SetMaxCardinality(options_.auto_dict_max_cardinality); + converter_ = std::move(dict_converter); + return Status::OK(); + }; + + can_loosen_type_ = true; + switch (infer_kind_) { case InferKind::Null: - infer_type_ = null(); - can_loosen_type_ = true; - break; + return make_converter(null()); case InferKind::Integer: - infer_type_ = int64(); - can_loosen_type_ = true; - break; + return make_converter(int64()); case InferKind::Boolean: - infer_type_ = boolean(); - can_loosen_type_ = true; - break; + return make_converter(boolean()); case InferKind::Timestamp: // We don't support parsing second fractions for now - infer_type_ = timestamp(TimeUnit::SECOND); - can_loosen_type_ = true; - break; + return make_converter(timestamp(TimeUnit::SECOND)); case InferKind::Real: - infer_type_ = float64(); - can_loosen_type_ = true; - break; + return make_converter(float64()); case InferKind::Text: - infer_type_ = utf8(); - can_loosen_type_ = true; - break; + return make_converter(utf8()); case InferKind::Binary: - infer_type_ = binary(); can_loosen_type_ = false; - break; + return make_converter(binary()); + case InferKind::TextDict: + return make_dict_converter(utf8()); + case InferKind::BinaryDict: + return make_dict_converter(binary()); } - return Converter::Make(infer_type_, options_, pool_, &converter_); + return Status::UnknownError("Shouldn't come here"); } void InferringColumnBuilder::ScheduleConvertChunk(size_t chunk_index) { @@ -336,7 +372,7 @@ Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) { DCHECK_NE(parser, nullptr); lock.unlock(); - Status st = converter->Convert(*parser, col_index_, &res); + Status conversion_status = converter->Convert(*parser, col_index_, &res); lock.lock(); if (kind != infer_kind_) { @@ -346,7 +382,7 @@ Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) { return Status::OK(); } - if (st.ok()) { + if (conversion_status.ok()) { // Conversion succeeded chunks_[chunk_index] = std::move(res); if (!can_loosen_type_) { @@ -356,7 +392,7 @@ Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) { return Status::OK(); } else if (can_loosen_type_) { // Conversion failed, try another type - RETURN_NOT_OK(LoosenType()); + RETURN_NOT_OK(LoosenType(conversion_status)); // Reconvert past finished chunks // (unfinished chunks will notice by themselves if they need reconverting) @@ -379,7 +415,7 @@ Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) { return Status::OK(); } else { // Conversion failed but cannot loosen more - return st; + return conversion_status; } } diff --git a/cpp/src/arrow/csv/column_builder_test.cc b/cpp/src/arrow/csv/column_builder_test.cc index 27b242f8bfb..1d895893d68 100644 --- a/cpp/src/arrow/csv/column_builder_test.cc +++ b/cpp/src/arrow/csv/column_builder_test.cc @@ -28,6 +28,7 @@ #include "arrow/table.h" #include "arrow/testing/util.h" #include "arrow/type.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" @@ -36,12 +37,14 @@ namespace csv { class BlockParser; +using internal::checked_cast; using internal::GetCpuThreadPool; using internal::TaskGroup; +using ChunkData = std::vector>; + void AssertBuilding(const std::shared_ptr& builder, - const std::vector>& chunks, - std::shared_ptr* out) { + const ChunkData& chunks, std::shared_ptr* out) { for (const auto& chunk : chunks) { std::shared_ptr parser; MakeColumnParser(chunk, &parser); @@ -52,6 +55,41 @@ void AssertBuilding(const std::shared_ptr& builder, ASSERT_OK((*out)->Validate()); } +void CheckInferred(const std::shared_ptr& tg, const ChunkData& csv_data, + const ConvertOptions& options, + std::shared_ptr expected) { + std::shared_ptr builder; + std::shared_ptr actual; + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); + AssertBuilding(builder, csv_data, &actual); + AssertChunkedEqual(*actual, *expected); +} + +void CheckInferred(const std::shared_ptr& tg, const ChunkData& csv_data, + const ConvertOptions& options, + std::vector> expected_chunks) { + CheckInferred(tg, csv_data, options, std::make_shared(expected_chunks)); +} + +void CheckFixedType(const std::shared_ptr& tg, + const std::shared_ptr& type, const ChunkData& csv_data, + const ConvertOptions& options, + std::shared_ptr expected) { + std::shared_ptr builder; + std::shared_ptr actual; + ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), type, 0, options, tg, &builder)); + AssertBuilding(builder, csv_data, &actual); + AssertChunkedEqual(*actual, *expected); +} + +void CheckFixedType(const std::shared_ptr& tg, + const std::shared_ptr& type, const ChunkData& csv_data, + const ConvertOptions& options, + std::vector> expected_chunks) { + CheckFixedType(tg, type, csv_data, options, + std::make_shared(expected_chunks)); +} + static ConvertOptions default_options = ConvertOptions::Defaults(); ////////////////////////////////////////////////////////////////////////// @@ -168,16 +206,9 @@ TEST(ColumnBuilder, Empty) { TEST(ColumnBuilder, Basics) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK( - ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"123", "-456"}}, &actual); - std::shared_ptr expected; - ChunkedArrayFromVector({{123, -456}}, &expected); - AssertChunkedEqual(*actual, *expected); + CheckFixedType(tg, int32(), {{"123", "-456"}}, options, + {ArrayFromJSON(int32(), "[123, -456]")}); } TEST(ColumnBuilder, Insert) { @@ -205,46 +236,28 @@ TEST(ColumnBuilder, Insert) { TEST(ColumnBuilder, MultipleChunks) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK( - ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{"1", "2", "3"}, {"4", "5"}}, &actual); - - std::shared_ptr expected; - ChunkedArrayFromVector({{1, 2, 3}, {4, 5}}, &expected); - AssertChunkedEqual(*actual, *expected); + CheckFixedType(tg, int16(), {{"1", "2", "3"}, {"4", "5"}}, options, + {ArrayFromJSON(int16(), "[1, 2, 3]"), ArrayFromJSON(int16(), "[4, 5]")}); } TEST(ColumnBuilder, MultipleChunksParallel) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); - std::shared_ptr builder; - ASSERT_OK( - ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); std::shared_ptr expected; ChunkedArrayFromVector({{1, 2}, {3}, {4, 5}, {6, 7}}, &expected); - AssertChunkedEqual(*actual, *expected); + CheckFixedType(tg, int32(), {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, options, + expected); } TEST(ColumnBuilder, EmptyChunks) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK( - ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{}, {"1", "2"}, {}}, &actual); - - std::shared_ptr expected; - ChunkedArrayFromVector({{}, {1, 2}, {}}, &expected); - AssertChunkedEqual(*actual, *expected); + CheckFixedType(tg, int16(), {{}, {"1", "2"}, {}}, options, + {ArrayFromJSON(int16(), "[]"), ArrayFromJSON(int16(), "[1, 2]"), + ArrayFromJSON(int16(), "[]")}); } ////////////////////////////////////////////////////////////////////////// @@ -253,233 +266,148 @@ TEST(ColumnBuilder, EmptyChunks) { TEST(InferringColumnBuilder, Empty) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {}, &actual); - ASSERT_EQ(actual->type()->id(), Type::NA); - ASSERT_EQ(actual->num_chunks(), 0); + CheckInferred(tg, {}, options, std::make_shared(ArrayVector(), null())); } TEST(InferringColumnBuilder, SingleChunkNull) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"", "NA"}}, &actual); - ASSERT_EQ(actual->type()->id(), Type::NA); - ASSERT_EQ(actual->length(), 2); + CheckInferred(tg, {{"", "NA"}}, options, {std::make_shared(2)}); } TEST(InferringColumnBuilder, MultipleChunkNull) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{"", "NA"}, {""}, {"NaN"}}, &actual); - - ASSERT_EQ(actual->type()->id(), Type::NA); - ASSERT_EQ(actual->length(), 4); + CheckInferred(tg, {{"", "NA"}, {""}, {"NaN"}}, options, + {std::make_shared(2), std::make_shared(1), + std::make_shared(1)}); } TEST(InferringColumnBuilder, SingleChunkInteger) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"", "123", "456"}}, &actual); - std::shared_ptr expected; - ChunkedArrayFromVector({{false, true, true}}, {{0, 123, 456}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "123", "456"}}, options, + {ArrayFromJSON(int64(), "[null, 123, 456]")}); } TEST(InferringColumnBuilder, MultipleChunkInteger) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"NA", "123", "456"}}, &actual); - std::shared_ptr expected; - ChunkedArrayFromVector({{false}, {false, true, true}}, {{0}, {0, 123, 456}}, - &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred( + tg, {{""}, {"NA", "123", "456"}}, options, + {ArrayFromJSON(int64(), "[null]"), ArrayFromJSON(int64(), "[null, 123, 456]")}); } TEST(InferringColumnBuilder, SingleChunkBoolean) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"", "0", "FALSE"}}, &actual); - std::shared_ptr expected; - ChunkedArrayFromVector({{false, true, true}}, - {{false, false, false}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "0", "FALSE", "TRUE"}}, options, + {ArrayFromJSON(boolean(), "[null, false, false, true]")}); } TEST(InferringColumnBuilder, MultipleChunkBoolean) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"1", "True", "0"}}, &actual); - - std::shared_ptr expected; - ChunkedArrayFromVector({{false}, {true, true, true}}, - {{false}, {true, true, false}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{""}, {"1", "True", "0"}}, options, + {ArrayFromJSON(boolean(), "[null]"), + ArrayFromJSON(boolean(), "[true, true, false]")}); } TEST(InferringColumnBuilder, SingleChunkReal) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{"", "0.0", "12.5"}}, &actual); - - std::shared_ptr expected; - ChunkedArrayFromVector({{false, true, true}}, {{0.0, 0.0, 12.5}}, - &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "0.0", "12.5"}}, options, + {ArrayFromJSON(float64(), "[null, 0.0, 12.5]")}); } TEST(InferringColumnBuilder, MultipleChunkReal) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"008"}, {"NaN", "12.5"}}, &actual); - - std::shared_ptr expected; - ChunkedArrayFromVector({{false}, {true}, {false, true}}, - {{0.0}, {8.0}, {0.0, 12.5}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{""}, {"008"}, {"NaN", "12.5"}}, options, + {ArrayFromJSON(float64(), "[null]"), ArrayFromJSON(float64(), "[8.0]"), + ArrayFromJSON(float64(), "[null, 12.5]")}); } TEST(InferringColumnBuilder, SingleChunkTimestamp) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, &actual); std::shared_ptr expected; ChunkedArrayFromVector(timestamp(TimeUnit::SECOND), {{false, true, true}}, {{0, 0, 1542129070}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "1970-01-01", "2018-11-13 17:11:10"}}, options, expected); } TEST(InferringColumnBuilder, MultipleChunkTimestamp) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, &actual); std::shared_ptr expected; ChunkedArrayFromVector(timestamp(TimeUnit::SECOND), {{false}, {true}, {true}}, {{0}, {0}, {1542129070}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{""}, {"1970-01-01"}, {"2018-11-13 17:11:10"}}, options, expected); } TEST(InferringColumnBuilder, SingleChunkString) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - std::shared_ptr actual; std::shared_ptr expected; // With valid UTF8 - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - AssertBuilding(builder, {{"", "foo", "baré"}}, &actual); - - ChunkedArrayFromVector({{true, true, true}}, - {{"", "foo", "baré"}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "foo", "baré"}}, options, + {ArrayFromJSON(utf8(), R"(["", "foo", "baré"])")}); // With invalid UTF8, non-checking options.check_utf8 = false; tg = TaskGroup::MakeSerial(); - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual); - ChunkedArrayFromVector({{true, true, true}}, {{"", "foo\xff", "baré"}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "foo\xff", "baré"}}, options, expected); } TEST(InferringColumnBuilder, SingleChunkBinary) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - std::shared_ptr actual; std::shared_ptr expected; // With invalid UTF8, checking - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - AssertBuilding(builder, {{"", "foo\xff", "baré"}}, &actual); - + tg = TaskGroup::MakeSerial(); ChunkedArrayFromVector({{true, true, true}}, {{"", "foo\xff", "baré"}}, &expected); - AssertChunkedEqual(*expected, *actual); + CheckInferred(tg, {{"", "foo\xff", "baré"}}, options, expected); } TEST(InferringColumnBuilder, MultipleChunkString) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré"}}, &actual); std::shared_ptr expected; ChunkedArrayFromVector( {{true}, {true}, {true, true}}, {{""}, {"008"}, {"NaN", "baré"}}, &expected); - AssertChunkedEqual(*expected, *actual); + + CheckInferred(tg, {{""}, {"008"}, {"NaN", "baré"}}, options, expected); } TEST(InferringColumnBuilder, MultipleChunkBinary) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); - std::shared_ptr builder; - ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); - - std::shared_ptr actual; - AssertBuilding(builder, {{""}, {"008"}, {"NaN", "baré\xff"}}, &actual); std::shared_ptr expected; ChunkedArrayFromVector( {{true}, {true}, {true, true}}, {{""}, {"008"}, {"NaN", "baré\xff"}}, &expected); - AssertChunkedEqual(*expected, *actual); + + CheckInferred(tg, {{""}, {"008"}, {"NaN", "baré\xff"}}, options, expected); } // Parallel parsing is tested more comprehensively on the Python side @@ -488,15 +416,56 @@ TEST(InferringColumnBuilder, MultipleChunkBinary) { TEST(InferringColumnBuilder, MultipleChunkIntegerParallel) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeThreaded(GetCpuThreadPool()); + + std::shared_ptr expected; + ChunkedArrayFromVector({{1, 2}, {3}, {4, 5}, {6, 7}}, &expected); + CheckInferred(tg, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, options, expected); +} + +void CheckAutoDictEncoded(const std::shared_ptr& tg, const ChunkData& csv_data, + const ConvertOptions& options, + std::vector> expected_indices, + std::vector> expected_dictionaries) { std::shared_ptr builder; + std::shared_ptr actual; ASSERT_OK(ColumnBuilder::Make(default_memory_pool(), 0, options, tg, &builder)); + AssertBuilding(builder, csv_data, &actual); + ASSERT_EQ(actual->num_chunks(), static_cast(csv_data.size())); + for (int i = 0; i < actual->num_chunks(); ++i) { + ASSERT_EQ(actual->chunk(i)->type_id(), Type::DICTIONARY); + const auto& dict_array = checked_cast(*actual->chunk(i)); + AssertArraysEqual(*dict_array.dictionary(), *expected_dictionaries[i]); + AssertArraysEqual(*dict_array.indices(), *expected_indices[i]); + } +} - std::shared_ptr actual; - AssertBuilding(builder, {{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, &actual); +TEST(InferringColumnBuilder, SingleChunkBinaryAutoDict) { + auto options = ConvertOptions::Defaults(); + options.auto_dict_encode = true; + options.auto_dict_max_cardinality = 3; - std::shared_ptr expected; - ChunkedArrayFromVector({{1, 2}, {3}, {4, 5}, {6, 7}}, &expected); - AssertChunkedEqual(*actual, *expected); + // With valid UTF8 + auto expected_indices = ArrayFromJSON(int32(), "[0, 1, 0]"); + auto expected_dictionary = ArrayFromJSON(utf8(), R"(["abé", "cd"])"); + ChunkData csv_data = {{"abé", "cd", "abé"}}; + + CheckAutoDictEncoded(TaskGroup::MakeSerial(), csv_data, options, {expected_indices}, + {expected_dictionary}); + + // With invalid UTF8, non-checking + csv_data = {{"ab", "cd\xff", "ab"}}; + options.check_utf8 = false; + ArrayFromVector({"ab", "cd\xff"}, &expected_dictionary); + + CheckAutoDictEncoded(TaskGroup::MakeSerial(), csv_data, options, {expected_indices}, + {expected_dictionary}); + + // With invalid UTF8, checking + options.check_utf8 = true; + ArrayFromVector({"ab", "cd\xff"}, &expected_dictionary); + + CheckAutoDictEncoded(TaskGroup::MakeSerial(), csv_data, options, {expected_indices}, + {expected_dictionary}); } } // namespace csv diff --git a/cpp/src/arrow/csv/converter.cc b/cpp/src/arrow/csv/converter.cc index 1c61d3ccbc9..4eac9cc3579 100644 --- a/cpp/src/arrow/csv/converter.cc +++ b/cpp/src/arrow/csv/converter.cc @@ -18,6 +18,7 @@ #include "arrow/csv/converter.h" #include +#include #include #include #include @@ -88,29 +89,42 @@ Status InitializeTrie(const std::vector& inputs, Trie* trie) { return Status::OK(); } -class ConcreteConverter : public Converter { - public: - using Converter::Converter; - +class ConcreteConverterMixin { protected: - Status Initialize() override; - inline bool IsNull(const uint8_t* data, uint32_t size, bool quoted); + Status InitializeNullTrie(const ConvertOptions& options); + + bool IsNull(const uint8_t* data, uint32_t size, bool quoted) { + if (quoted) { + return false; + } + return null_trie_.Find( + util::string_view(reinterpret_cast(data), size)) >= 0; + } Trie null_trie_; }; -Status ConcreteConverter::Initialize() { +Status ConcreteConverterMixin::InitializeNullTrie(const ConvertOptions& options) { // TODO no need to build a separate Trie for each Converter instance - return InitializeTrie(options_.null_values, &null_trie_); + return InitializeTrie(options.null_values, &null_trie_); } -bool ConcreteConverter::IsNull(const uint8_t* data, uint32_t size, bool quoted) { - if (quoted) { - return false; - } - return null_trie_.Find(util::string_view(reinterpret_cast(data), size)) >= - 0; -} +class ConcreteConverter : public Converter, public ConcreteConverterMixin { + public: + using Converter::Converter; + + protected: + Status Initialize() override { return InitializeNullTrie(options_); } +}; + +class ConcreteDictionaryConverter : public DictionaryConverter, + public ConcreteConverterMixin { + public: + using DictionaryConverter::DictionaryConverter; + + protected: + Status Initialize() override { return InitializeNullTrie(options_); } +}; ///////////////////////////////////////////////////////////////////////// // Concrete Converter for null values @@ -144,7 +158,7 @@ Status NullConverter::Convert(const BlockParser& parser, int32_t col_index, // Concrete Converter for var-sized binary strings template -class VarSizeBinaryConverter : public ConcreteConverter { +class BinaryConverter : public ConcreteConverter { public: using ConcreteConverter::ConcreteConverter; @@ -191,6 +205,61 @@ class VarSizeBinaryConverter : public ConcreteConverter { } }; +template +class DictionaryBinaryConverter : public ConcreteDictionaryConverter { + public: + using ConcreteDictionaryConverter::ConcreteDictionaryConverter; + + Status Convert(const BlockParser& parser, int32_t col_index, + std::shared_ptr* out) override { + // We use a fixed index width so that all column chunks get the same index type + using BuilderType = Dictionary32Builder; + BuilderType builder(type_, pool_); + + auto visit_non_null = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { + if (CheckUTF8 && ARROW_PREDICT_FALSE(!util::ValidateUTF8(data, size))) { + return Status::Invalid("CSV conversion error to ", type_->ToString(), + ": invalid UTF8 data"); + } + RETURN_NOT_OK( + builder.Append(util::string_view(reinterpret_cast(data), size))); + if (ARROW_PREDICT_FALSE(builder.dictionary_length() > max_cardinality_)) { + return Status::IndexError("Dictionary length exceeded max cardinality"); + } + return Status::OK(); + }; + + RETURN_NOT_OK(builder.Resize(parser.num_rows())); + + if (options_.strings_can_be_null) { + auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { + if (IsNull(data, size, false /* quoted */)) { + return builder.AppendNull(); + } else { + return visit_non_null(data, size, quoted); + } + }; + RETURN_NOT_OK(parser.VisitColumn(col_index, visit)); + } else { + RETURN_NOT_OK(parser.VisitColumn(col_index, visit_non_null)); + } + + RETURN_NOT_OK(builder.Finish(out)); + + return Status::OK(); + } + + void SetMaxCardinality(int32_t max_length) override { max_cardinality_ = max_length; } + + protected: + Status Initialize() override { + util::InitializeUTF8(); + return ConcreteDictionaryConverter::Initialize(); + } + + int32_t max_cardinality_ = std::numeric_limits::max(); +}; + ///////////////////////////////////////////////////////////////////////// // Concrete Converter for fixed-sized binary strings @@ -406,15 +475,15 @@ Converter::Converter(const std::shared_ptr& type, const ConvertOptions MemoryPool* pool) : options_(options), pool_(pool), type_(type) {} -Status Converter::Make(const std::shared_ptr& type, - const ConvertOptions& options, MemoryPool* pool, - std::shared_ptr* out) { - Converter* result; +Result> Converter::Make(const std::shared_ptr& type, + const ConvertOptions& options, + MemoryPool* pool) { + Converter* ptr; switch (type->id()) { -#define CONVERTER_CASE(TYPE_ID, CONVERTER_TYPE) \ - case TYPE_ID: \ - result = new CONVERTER_TYPE(type, options, pool); \ +#define CONVERTER_CASE(TYPE_ID, CONVERTER_TYPE) \ + case TYPE_ID: \ + ptr = new CONVERTER_TYPE(type, options, pool); \ break; CONVERTER_CASE(Type::NA, NullConverter) @@ -430,24 +499,24 @@ Status Converter::Make(const std::shared_ptr& type, CONVERTER_CASE(Type::DOUBLE, NumericConverter) CONVERTER_CASE(Type::BOOL, BooleanConverter) CONVERTER_CASE(Type::TIMESTAMP, TimestampConverter) - CONVERTER_CASE(Type::BINARY, (VarSizeBinaryConverter)) - CONVERTER_CASE(Type::LARGE_BINARY, (VarSizeBinaryConverter)) + CONVERTER_CASE(Type::BINARY, (BinaryConverter)) + CONVERTER_CASE(Type::LARGE_BINARY, (BinaryConverter)) CONVERTER_CASE(Type::FIXED_SIZE_BINARY, FixedSizeBinaryConverter) CONVERTER_CASE(Type::DECIMAL, DecimalConverter) case Type::STRING: if (options.check_utf8) { - result = new VarSizeBinaryConverter(type, options, pool); + ptr = new BinaryConverter(type, options, pool); } else { - result = new VarSizeBinaryConverter(type, options, pool); + ptr = new BinaryConverter(type, options, pool); } break; case Type::LARGE_STRING: if (options.check_utf8) { - result = new VarSizeBinaryConverter(type, options, pool); + ptr = new BinaryConverter(type, options, pool); } else { - result = new VarSizeBinaryConverter(type, options, pool); + ptr = new BinaryConverter(type, options, pool); } break; @@ -458,13 +527,42 @@ Status Converter::Make(const std::shared_ptr& type, #undef CONVERTER_CASE } - out->reset(result); - return result->Initialize(); + std::shared_ptr result(ptr); + RETURN_NOT_OK(result->Initialize()); + return result; } -Status Converter::Make(const std::shared_ptr& type, - const ConvertOptions& options, std::shared_ptr* out) { - return Make(type, options, default_memory_pool(), out); +Result> DictionaryConverter::Make( + const std::shared_ptr& type, const ConvertOptions& options, + MemoryPool* pool) { + DictionaryConverter* ptr; + + switch (type->id()) { +#define CONVERTER_CASE(TYPE_ID, CONVERTER_TYPE) \ + case TYPE_ID: \ + ptr = new CONVERTER_TYPE(type, options, pool); \ + break; + + CONVERTER_CASE(Type::BINARY, (DictionaryBinaryConverter)) + + case Type::STRING: + if (options.check_utf8) { + ptr = new DictionaryBinaryConverter(type, options, pool); + } else { + ptr = new DictionaryBinaryConverter(type, options, pool); + } + break; + + default: { + return Status::NotImplemented("CSV dictionary conversion to ", type->ToString(), + " is not supported"); + } + +#undef CONVERTER_CASE + } + std::shared_ptr result(ptr); + RETURN_NOT_OK(result->Initialize()); + return result; } } // namespace csv diff --git a/cpp/src/arrow/csv/converter.h b/cpp/src/arrow/csv/converter.h index fa2f0865a3f..eca9a95080a 100644 --- a/cpp/src/arrow/csv/converter.h +++ b/cpp/src/arrow/csv/converter.h @@ -22,16 +22,12 @@ #include #include "arrow/csv/options.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { - -class Array; -class DataType; -class MemoryPool; -class Status; - namespace csv { class BlockParser; @@ -47,10 +43,9 @@ class ARROW_EXPORT Converter { std::shared_ptr type() const { return type_; } - static Status Make(const std::shared_ptr& type, const ConvertOptions& options, - std::shared_ptr* out); - static Status Make(const std::shared_ptr& type, const ConvertOptions& options, - MemoryPool* pool, std::shared_ptr* out); + static Result> Make( + const std::shared_ptr& type, const ConvertOptions& options, + MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Converter); @@ -64,6 +59,19 @@ class ARROW_EXPORT Converter { std::shared_ptr type_; }; +class ARROW_EXPORT DictionaryConverter : public Converter { + public: + using Converter::Converter; + + // If the dictionary length goes above this value, conversion will fail + // with Status::IndexError. + virtual void SetMaxCardinality(int32_t max_length) = 0; + + static Result> Make( + const std::shared_ptr& value_type, const ConvertOptions& options, + MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT); +}; + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/converter_benchmark.cc b/cpp/src/arrow/csv/converter_benchmark.cc index abbc376a4f7..ed7f1adb8eb 100644 --- a/cpp/src/arrow/csv/converter_benchmark.cc +++ b/cpp/src/arrow/csv/converter_benchmark.cc @@ -76,8 +76,7 @@ static void BenchmarkConversion(benchmark::State& state, // NOLINT non-const re BlockParser& parser, const std::shared_ptr& type, ConvertOptions options) { - std::shared_ptr converter; - ABORT_NOT_OK(Converter::Make(type, options, &converter)); + std::shared_ptr converter = *Converter::Make(type, options); while (state.KeepRunning()) { std::shared_ptr result; diff --git a/cpp/src/arrow/csv/converter_test.cc b/cpp/src/arrow/csv/converter_test.cc index b121d753117..a597bd545bc 100644 --- a/cpp/src/arrow/csv/converter_test.cc +++ b/cpp/src/arrow/csv/converter_test.cc @@ -31,6 +31,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/type.h" #include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" #include "arrow/util/logging.h" @@ -55,7 +56,7 @@ void AssertConversion(const std::shared_ptr& type, std::shared_ptr converter; std::shared_ptr array, expected_array; - ASSERT_OK(Converter::Make(type, options, &converter)); + ASSERT_OK_AND_ASSIGN(converter, Converter::Make(type, options)); MakeCSVParser(csv_string, &parser); for (int32_t col_index = 0; col_index < static_cast(expected.size()); @@ -76,7 +77,7 @@ void AssertConversion(const std::shared_ptr& type, std::shared_ptr converter; std::shared_ptr array, expected_array; - ASSERT_OK(Converter::Make(type, options, &converter)); + ASSERT_OK_AND_ASSIGN(converter, Converter::Make(type, options)); MakeCSVParser(csv_string, &parser); for (int32_t col_index = 0; col_index < static_cast(expected.size()); @@ -88,6 +89,46 @@ void AssertConversion(const std::shared_ptr& type, } } +Result> DictConversion( + const std::shared_ptr& value_type, const std::string& csv_string, + int32_t max_cardinality = -1, ConvertOptions options = ConvertOptions::Defaults()) { + std::shared_ptr parser; + std::shared_ptr converter; + + ARROW_ASSIGN_OR_RAISE(converter, DictionaryConverter::Make(value_type, options)); + if (max_cardinality >= 0) { + converter->SetMaxCardinality(max_cardinality); + } + + ParseOptions parse_options; + parse_options.ignore_empty_lines = false; + MakeCSVParser({csv_string}, parse_options, &parser); + + const int32_t col_index = 0; + std::shared_ptr array; + RETURN_NOT_OK(converter->Convert(*parser, col_index, &array)); + return array; +} + +void AssertDictConversion(const std::string& csv_string, + const std::shared_ptr& expected_indices, + const std::shared_ptr& expected_dict, + int32_t max_cardinality = -1, + ConvertOptions options = ConvertOptions::Defaults()) { + std::shared_ptr parser; + std::shared_ptr converter; + std::shared_ptr array, expected_array; + std::shared_ptr expected_type; + + ASSERT_OK_AND_ASSIGN( + array, DictConversion(expected_dict->type(), csv_string, max_cardinality, options)); + expected_type = dictionary(expected_indices->type(), expected_dict->type()); + ASSERT_TRUE(array->type()->Equals(*expected_type)); + const auto& dict_array = internal::checked_cast(*array); + AssertArraysEqual(*dict_array.dictionary(), *expected_dict); + AssertArraysEqual(*dict_array.indices(), *expected_indices); +} + template void AssertConversionAllNulls(const std::shared_ptr& type) { std::vector nulls = AllNulls(); @@ -104,7 +145,7 @@ void AssertConversionError(const std::shared_ptr& type, std::shared_ptr converter; std::shared_ptr array; - ASSERT_OK(Converter::Make(type, options, &converter)); + ASSERT_OK_AND_ASSIGN(converter, Converter::Make(type, options)); MakeCSVParser(csv_string, &parser); for (int32_t i = 0; i < parser->num_cols(); ++i) { @@ -117,7 +158,7 @@ void AssertConversionError(const std::shared_ptr& type, } ////////////////////////////////////////////////////////////////////////// -// Test functions begin here +// Converter tests template static void TestBinaryConversionBasics() { @@ -197,7 +238,7 @@ TEST(NullConversion, Basics) { std::shared_ptr array; std::shared_ptr type = null(); - ASSERT_OK(Converter::Make(type, ConvertOptions::Defaults(), &converter)); + ASSERT_OK_AND_ASSIGN(converter, Converter::Make(type, ConvertOptions::Defaults())); MakeCSVParser({"NA,z\n", ",0\n"}, &parser); ASSERT_OK(converter->Convert(*parser, 0, &array)); @@ -380,5 +421,68 @@ TEST(DecimalConversion, OverflowFails) { AssertConversionError(decimal(5, 1), {"1.61\n"}, {0}); } +////////////////////////////////////////////////////////////////////////// +// DictionaryConverter tests + +template +class TestDictConverter : public ::testing::Test { + public: + std::shared_ptr type() const { return TypeTraits::type_singleton(); } + + bool is_utf8_type() const { + return T::type_id == Type::STRING || T::type_id == Type::LARGE_STRING; + } +}; + +using DictConversionTypes = ::testing::Types; + +TYPED_TEST_CASE(TestDictConverter, DictConversionTypes); + +TYPED_TEST(TestDictConverter, Basics) { + auto expected_dict = ArrayFromJSON(this->type(), R"(["ab", "cdé", ""])"); + auto expected_indices = ArrayFromJSON(int32(), "[0, 1, 2, 0]"); + + AssertDictConversion("ab\ncdé\n\nab\n", expected_indices, expected_dict); +} + +TYPED_TEST(TestDictConverter, Nulls) { + auto expected_dict = ArrayFromJSON(this->type(), R"(["ab", "N/A", ""])"); + auto expected_indices = ArrayFromJSON(int32(), "[0, 1, 2, 0]"); + + AssertDictConversion("ab\nN/A\n\nab\n", expected_indices, expected_dict); + + auto options = ConvertOptions::Defaults(); + options.strings_can_be_null = true; + expected_dict = ArrayFromJSON(this->type(), R"(["ab"])"); + expected_indices = ArrayFromJSON(int32(), "[0, null, null, 0]"); + AssertDictConversion("ab\nN/A\n\nab\n", expected_indices, expected_dict, -1, options); +} + +TYPED_TEST(TestDictConverter, NonUTF8) { + auto expected_indices = ArrayFromJSON(int32(), "[0, 1, 2, 0]"); + std::shared_ptr expected_dict; + ArrayFromVector({"ab", "cd\xff", ""}, &expected_dict); + std::string csv_string = "ab\ncd\xff\n\nab\n"; + + if (this->is_utf8_type()) { + ASSERT_RAISES(Invalid, DictConversion(this->type(), "ab\ncd\xff\n\nab\n")); + + auto options = ConvertOptions::Defaults(); + options.check_utf8 = false; + AssertDictConversion(csv_string, expected_indices, expected_dict, -1, options); + } else { + AssertDictConversion(csv_string, expected_indices, expected_dict); + } +} + +TYPED_TEST(TestDictConverter, MaxCardinality) { + auto expected_dict = ArrayFromJSON(this->type(), R"(["ab", "cd", "ef"])"); + auto expected_indices = ArrayFromJSON(int32(), "[0, 1, 2, 1]"); + std::string csv_string = "ab\ncd\nef\ncd\n"; + + AssertDictConversion(csv_string, expected_indices, expected_dict, 3); + ASSERT_RAISES(IndexError, DictConversion(this->type(), csv_string, 2)); +} + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 48af5a6b8ea..5af70620442 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -79,6 +79,15 @@ struct ARROW_EXPORT ConvertOptions { /// If false, then all strings are valid string values. bool strings_can_be_null = false; + /// Whether to try to automatically dict-encode string / binary data. + /// If true, then when type inference detects a string or binary column, + /// it it dict-encoded up to `auto_dict_max_cardinality` distinct values + /// (per chunk), after which it switches to regular encoding. + /// + /// This setting is ignored for non-inferred columns (those in `column_types`). + bool auto_dict_encode = false; + int32_t auto_dict_max_cardinality = 50; + // XXX Should we have a separate FilterOptions? /// If non-empty, indicates the names of columns from the CSV file that should diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 4c69d91cc4d..e4397326bbf 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -27,6 +27,7 @@ #include "arrow/buffer.h" #include "arrow/io/memory.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index ab74c0e1f4e..963de1453b7 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -23,8 +23,8 @@ #include "arrow/buffer.h" #include "arrow/io/memory.h" -#include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/type_fwd.h" namespace arrow { namespace cuda { diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index f979715ef7a..9563e9db753 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -39,6 +39,7 @@ #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/io/test_common.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/string_view.h" diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 751eba1c575..783aaaa23b4 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -24,6 +24,7 @@ #include "arrow/buffer.h" #include "arrow/io/util_internal.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 40c25a230aa..a293afe353f 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -22,17 +22,14 @@ #include #include -#include "arrow/buffer.h" #include "arrow/io/concurrency.h" #include "arrow/io/interfaces.h" -#include "arrow/memory_pool.h" +#include "arrow/type_fwd.h" #include "arrow/util/string_view.h" #include "arrow/util/visibility.h" namespace arrow { -class Buffer; -class ResizableBuffer; class Status; namespace io { diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index eca8d2349a2..8970de16c12 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -24,6 +24,7 @@ #include #include "arrow/status.h" +#include "arrow/type_fwd.h" #include "arrow/util/visibility.h" namespace arrow { @@ -149,9 +150,6 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool { std::unique_ptr impl_; }; -/// Return the process-wide default memory pool. -ARROW_EXPORT MemoryPool* default_memory_pool(); - /// Return a process-wide memory pool based on the system allocator. ARROW_EXPORT MemoryPool* system_memory_pool(); @@ -176,10 +174,6 @@ Status jemalloc_set_decay_ms(int ms); /// May return NotImplemented if mimalloc is not available. ARROW_EXPORT Status mimalloc_memory_pool(MemoryPool** out); -#ifndef ARROW_MEMORY_POOL_DEFAULT -#define ARROW_MEMORY_POOL_DEFAULT = default_memory_pool() -#endif - } // namespace arrow #endif // ARROW_MEMORY_POOL_H diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index 5422038b670..a6159acc6cb 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -48,7 +48,7 @@ using RecordBatchIterator = Iterator>; class Buffer; class MemoryPool; -class RecordBatch; +class ResizableBuffer; class Schema; class DictionaryType; @@ -251,6 +251,13 @@ std::shared_ptr ARROW_EXPORT date64(); /// @} +/// Return the process-wide default memory pool. +ARROW_EXPORT MemoryPool* default_memory_pool(); + +#ifndef ARROW_MEMORY_POOL_DEFAULT +#define ARROW_MEMORY_POOL_DEFAULT = default_memory_pool() +#endif + } // namespace arrow #endif // ARROW_TYPE_FWD_H diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 5dccad760f4..d1c6d3c6c64 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -309,6 +309,18 @@ cdef class ConvertOptions: If true, then strings in null_values are considered null for string columns. If false, then all strings are valid string values. + + auto_dict_encode: bool, optional (default False) + Whether to try to automatically dict-encode string / binary data. + If true, then when type inference detects a string or binary column, + it it dict-encoded up to `auto_dict_max_cardinality` distinct values + (per chunk), after which it switches to regular encoding. + This setting is ignored for non-inferred columns (those in + `column_types`). + auto_dict_max_cardinality: int, optional + The maximum dictionary cardinality for `auto_dict_encode`. + This value is per chunk. + include_columns: list, optional The names of columns to include in the Table. If empty, the Table will include all columns from the CSV file. @@ -330,7 +342,8 @@ cdef class ConvertOptions: def __init__(self, check_utf8=None, column_types=None, null_values=None, true_values=None, false_values=None, strings_can_be_null=None, include_columns=None, - include_missing_columns=None): + include_missing_columns=None, auto_dict_encode=None, + auto_dict_max_cardinality=None): self.options = CCSVConvertOptions.Defaults() if check_utf8 is not None: self.check_utf8 = check_utf8 @@ -348,6 +361,10 @@ cdef class ConvertOptions: self.include_columns = include_columns if include_missing_columns is not None: self.include_missing_columns = include_missing_columns + if auto_dict_encode is not None: + self.auto_dict_encode = auto_dict_encode + if auto_dict_max_cardinality is not None: + self.auto_dict_max_cardinality = auto_dict_max_cardinality @property def check_utf8(self): @@ -433,6 +450,30 @@ cdef class ConvertOptions: def false_values(self, value): self.options.false_values = [tobytes(x) for x in value] + @property + def auto_dict_encode(self): + """ + Whether to try to automatically dict-encode string / binary data. + """ + return self.options.auto_dict_encode + + @auto_dict_encode.setter + def auto_dict_encode(self, value): + self.options.auto_dict_encode = value + + @property + def auto_dict_max_cardinality(self): + """ + The maximum dictionary cardinality for `auto_dict_encode`. + + This value is per chunk. + """ + return self.options.auto_dict_max_cardinality + + @auto_dict_max_cardinality.setter + def auto_dict_max_cardinality(self, value): + self.options.auto_dict_max_cardinality = value + @property def include_columns(self): """ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 1c3f962c4c0..06ec558230b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1312,6 +1312,10 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: vector[c_string] true_values vector[c_string] false_values c_bool strings_can_be_null + + c_bool auto_dict_encode + int32_t auto_dict_max_cardinality + vector[c_string] include_columns c_bool include_missing_columns diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 1607f0c8b53..0fda62a9d21 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -68,99 +70,70 @@ def make_empty_csv(column_names): return csv.getvalue().encode() +def check_options_class(cls, **attr_values): + """ + Check setting and getting attributes of an *Options class. + """ + opts = cls() + + for name, values in attr_values.items(): + assert getattr(opts, name) == values[0], \ + "incorrect default value for " + name + for v in values: + setattr(opts, name, v) + assert getattr(opts, name) == v, "failed setting value" + + with pytest.raises(AttributeError): + opts.zzz_non_existent = True + + # Check constructor named arguments + non_defaults = {name: values[1] for name, values in attr_values.items()} + opts = cls(**non_defaults) + for name, value in non_defaults.items(): + assert getattr(opts, name) == value + + def test_read_options(): cls = ReadOptions opts = cls() + check_options_class(cls, use_threads=[True, False], + skip_rows=[0, 3], + column_names=[[], ["ab", "cd"]], + autogenerate_column_names=[False, True]) + assert opts.block_size > 0 opts.block_size = 12345 assert opts.block_size == 12345 - assert opts.use_threads is True - opts.use_threads = False - assert opts.use_threads is False - - assert opts.skip_rows == 0 - opts.skip_rows = 3 - assert opts.skip_rows == 3 - - assert opts.column_names == [] - opts.column_names = ["ab", "cd"] - assert opts.column_names == ["ab", "cd"] - - assert opts.autogenerate_column_names is False - opts.autogenerate_column_names = True - assert opts.autogenerate_column_names is True - - opts = cls(block_size=1234, use_threads=False, skip_rows=42, - column_names=["a", "b", "c"]) + opts = cls(block_size=1234) assert opts.block_size == 1234 - assert opts.use_threads is False - assert opts.skip_rows == 42 - assert opts.column_names == ["a", "b", "c"] - assert opts.autogenerate_column_names is False - - opts = cls(autogenerate_column_names=True) - assert opts.use_threads is True - assert opts.skip_rows == 0 - assert opts.column_names == [] - assert opts.autogenerate_column_names is True def test_parse_options(): cls = ParseOptions - opts = cls() - assert opts.delimiter == ',' - assert opts.quote_char == '"' - assert opts.double_quote is True - assert opts.escape_char is False - assert opts.newlines_in_values is False - assert opts.ignore_empty_lines is True - - opts.delimiter = 'x' - assert opts.delimiter == 'x' - assert opts.quote_char == '"' - - opts.escape_char = 'z' - assert opts.escape_char == 'z' - assert opts.quote_char == '"' - - opts.quote_char = False - assert opts.quote_char is False - assert opts.escape_char == 'z' - - opts.escape_char = False - assert opts.escape_char is False - assert opts.quote_char is False - - opts.newlines_in_values = True - assert opts.newlines_in_values is True - - opts.ignore_empty_lines = False - assert opts.ignore_empty_lines is False - - opts = cls(delimiter=';', quote_char='%', double_quote=False, - escape_char='\\', newlines_in_values=True, - ignore_empty_lines=False) - assert opts.delimiter == ';' - assert opts.quote_char == '%' - assert opts.double_quote is False - assert opts.escape_char == '\\' - assert opts.newlines_in_values is True - assert opts.ignore_empty_lines is False + + check_options_class(cls, delimiter=[',', 'x'], + escape_char=[False, 'y'], + quote_char=['"', 'z', False], + double_quote=[True, False], + newlines_in_values=[False, True], + ignore_empty_lines=[True, False]) def test_convert_options(): cls = ConvertOptions opts = cls() - assert opts.check_utf8 is True - opts.check_utf8 = False - assert opts.check_utf8 is False + check_options_class(cls, check_utf8=[True, False], + strings_can_be_null=[False, True], + include_columns=[[], ['def', 'abc']], + include_missing_columns=[False, True], + auto_dict_encode=[False, True]) - assert opts.strings_can_be_null is False - opts.strings_can_be_null = True - assert opts.strings_can_be_null is True + assert opts.auto_dict_max_cardinality > 0 + opts.auto_dict_max_cardinality = 99999 + assert opts.auto_dict_max_cardinality == 99999 assert opts.column_types == {} # Pass column_types as mapping @@ -195,27 +168,14 @@ def test_convert_options(): opts.false_values = ['xxx', 'yyy'] assert opts.false_values == ['xxx', 'yyy'] - assert opts.include_columns == [] - opts.include_columns = ['def', 'abc'] - assert opts.include_columns == ['def', 'abc'] - - assert opts.include_missing_columns is False - opts.include_missing_columns = True - assert opts.include_missing_columns is True - - opts = cls(check_utf8=False, column_types={'a': pa.null()}, + opts = cls(column_types={'a': pa.null()}, null_values=['N', 'nn'], true_values=['T', 'tt'], - false_values=['F', 'ff'], strings_can_be_null=True, - include_columns=['abc', 'def'], - include_missing_columns=True) - assert opts.check_utf8 is False + false_values=['F', 'ff'], auto_dict_max_cardinality=999) assert opts.column_types == {'a': pa.null()} assert opts.null_values == ['N', 'nn'] assert opts.false_values == ['F', 'ff'] assert opts.true_values == ['T', 'tt'] - assert opts.strings_can_be_null is True - assert opts.include_columns == ['abc', 'def'] - assert opts.include_missing_columns is True + assert opts.auto_dict_max_cardinality == 999 class BaseTestCSVRead: @@ -501,6 +461,54 @@ def test_simple_timestamps(self): 'b': [datetime(1970, 1, 1), datetime(1989, 7, 14)], } + def test_auto_dict_encode(self): + opts = ConvertOptions(auto_dict_encode=True) + rows = u"a,b\nab,1\ncdé,2\ncdé,3\nab,4".encode('utf8') + table = self.read_bytes(rows, convert_options=opts) + schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.string())), + ('b', pa.int64())]) + expected = { + 'a': [u"ab", u"cdé", u"cdé", u"ab"], + 'b': [1, 2, 3, 4], + } + assert table.schema == schema + assert table.to_pydict() == expected + + opts.auto_dict_max_cardinality = 2 + table = self.read_bytes(rows, convert_options=opts) + assert table.schema == schema + assert table.to_pydict() == expected + + # Cardinality above max => plain-encoded + opts.auto_dict_max_cardinality = 1 + table = self.read_bytes(rows, convert_options=opts) + assert table.schema == pa.schema([('a', pa.string()), + ('b', pa.int64())]) + assert table.to_pydict() == expected + + # With invalid UTF8, not checked + opts.auto_dict_max_cardinality = 50 + opts.check_utf8 = False + rows = b"a,b\nab,1\ncd\xff,2\nab,3" + table = self.read_bytes(rows, convert_options=opts) + assert table.schema == schema + dict_values = table['a'].chunk(0).dictionary + assert len(dict_values) == 2 + assert dict_values[0] == u"ab" + assert dict_values[1].as_buffer() == b"cd\xff" + + # With invalid UTF8, checked + opts.check_utf8 = True + table = self.read_bytes(rows, convert_options=opts) + schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.binary())), + ('b', pa.int64())]) + expected = { + 'a': [b"ab", b"cd\xff", b"ab"], + 'b': [1, 2, 3], + } + assert table.schema == schema + assert table.to_pydict() == expected + def test_custom_nulls(self): # Infer nulls with custom values opts = ConvertOptions(null_values=['Xxx', 'Zzz'])