-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-3246: [C++][Python][Parquet] Direct writing of DictionaryArray to Parquet columns, automatic decoding to Arrow #5077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c871ea9
c4d7dc2
245f445
882e434
1264e10
57a45e0
edc9f84
d21ebd8
5aaf281
8cc1bcf
0a293ee
1380531
de9d0a5
28268d6
580a0ca
7705fdb
7d663d5
f26d7da
3425da4
5dc00b1
8f4cd44
3f45fef
91555b6
92cf4e0
494b954
7f3a2a8
ad3bad3
6b1769c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -343,9 +343,11 @@ void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_s | |
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties, | ||
| std::shared_ptr<Buffer>* out) { | ||
| auto sink = CreateOutputStream(); | ||
|
|
||
| auto write_props = WriterProperties::Builder().write_batch_size(100)->build(); | ||
|
|
||
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, | ||
| row_group_size, default_writer_properties(), | ||
| arrow_properties)); | ||
| row_group_size, write_props, arrow_properties)); | ||
| ASSERT_OK_NO_THROW(sink->Finish(out)); | ||
| } | ||
|
|
||
|
|
@@ -368,37 +370,39 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual | |
| } | ||
| } | ||
|
|
||
| void DoConfiguredRoundtrip( | ||
| const std::shared_ptr<Table>& table, int64_t row_group_size, | ||
| std::shared_ptr<Table>* out, | ||
| const std::shared_ptr<::parquet::WriterProperties>& parquet_properties = | ||
| ::parquet::default_writer_properties(), | ||
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties = | ||
| default_arrow_writer_properties()) { | ||
| void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size, | ||
| std::shared_ptr<Table>* out, | ||
| const std::shared_ptr<::parquet::WriterProperties>& writer_properties = | ||
| ::parquet::default_writer_properties(), | ||
| const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties = | ||
| default_arrow_writer_properties(), | ||
| const ArrowReaderProperties& arrow_reader_properties = | ||
| default_arrow_reader_properties()) { | ||
| std::shared_ptr<Buffer> buffer; | ||
|
|
||
| auto sink = CreateOutputStream(); | ||
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, | ||
| row_group_size, parquet_properties, arrow_properties)); | ||
| row_group_size, writer_properties, | ||
| arrow_writer_properties)); | ||
| ASSERT_OK_NO_THROW(sink->Finish(&buffer)); | ||
|
|
||
| std::unique_ptr<FileReader> reader; | ||
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), | ||
| ::arrow::default_memory_pool(), &reader)); | ||
| FileReaderBuilder builder; | ||
| ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer))); | ||
| ASSERT_OK(builder.properties(arrow_reader_properties)->Build(&reader)); | ||
| ASSERT_OK_NO_THROW(reader->ReadTable(out)); | ||
| } | ||
|
|
||
| void CheckConfiguredRoundtrip( | ||
| const std::shared_ptr<Table>& input_table, | ||
| const std::shared_ptr<Table>& expected_table = nullptr, | ||
| const std::shared_ptr<::parquet::WriterProperties>& parquet_properties = | ||
| const std::shared_ptr<::parquet::WriterProperties>& writer_properties = | ||
| ::parquet::default_writer_properties(), | ||
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties = | ||
| const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties = | ||
| default_arrow_writer_properties()) { | ||
| std::shared_ptr<Table> actual_table; | ||
| ASSERT_NO_FATAL_FAILURE(DoConfiguredRoundtrip(input_table, input_table->num_rows(), | ||
| &actual_table, parquet_properties, | ||
| arrow_properties)); | ||
| ASSERT_NO_FATAL_FAILURE(DoRoundtrip(input_table, input_table->num_rows(), &actual_table, | ||
| writer_properties, arrow_writer_properties)); | ||
| if (expected_table) { | ||
| ASSERT_NO_FATAL_FAILURE( | ||
| ::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema())); | ||
|
|
@@ -439,9 +443,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group | |
| std::shared_ptr<Table> result; | ||
| DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result, | ||
| arrow_properties); | ||
| ASSERT_NO_FATAL_FAILURE( | ||
| ::arrow::AssertSchemaEqual(*table->schema(), *result->schema())); | ||
| ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false)); | ||
| ::arrow::AssertSchemaEqual(*table->schema(), *result->schema()); | ||
| ::arrow::AssertTablesEqual(*table, *result, false); | ||
| } | ||
|
|
||
| static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type, | ||
|
|
@@ -751,8 +754,8 @@ TYPED_TEST(TestParquetIO, SingleEmptyListsColumnReadWrite) { | |
|
|
||
| TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { | ||
| std::shared_ptr<Table> table; | ||
| ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, true, 10, &table)); | ||
| ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); | ||
| this->PrepareListTable(SMALL_SIZE, true, true, 10, &table); | ||
| this->CheckRoundTrip(table); | ||
| } | ||
|
|
||
| TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { | ||
|
|
@@ -1169,8 +1172,12 @@ TEST_F(TestNullParquetIO, NullListColumn) { | |
| } | ||
|
|
||
| TEST_F(TestNullParquetIO, NullDictionaryColumn) { | ||
| std::shared_ptr<Buffer> null_bitmap; | ||
| ASSERT_OK(::arrow::AllocateEmptyBitmap(::arrow::default_memory_pool(), SMALL_SIZE, | ||
| &null_bitmap)); | ||
|
|
||
| std::shared_ptr<Array> indices = | ||
| std::make_shared<::arrow::Int8Array>(SMALL_SIZE, nullptr, nullptr, SMALL_SIZE); | ||
| std::make_shared<::arrow::Int8Array>(SMALL_SIZE, nullptr, null_bitmap, SMALL_SIZE); | ||
| std::shared_ptr<::arrow::DictionaryType> dict_type = | ||
| std::make_shared<::arrow::DictionaryType>(::arrow::int8(), ::arrow::null()); | ||
|
|
||
|
|
@@ -2803,7 +2810,7 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> { | |
| ::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false); | ||
| } | ||
|
|
||
| static std::vector<double> null_probabilites() { return {0.0, 0.5, 1}; } | ||
| static std::vector<double> null_probabilities() { return {0.0, 0.5, 1}; } | ||
|
|
||
| protected: | ||
| std::shared_ptr<Array> dense_values_; | ||
|
|
@@ -2813,7 +2820,7 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> { | |
| ArrowReaderProperties properties_; | ||
| }; | ||
|
|
||
| void AsDictionaryEncoded(const Array& arr, std::shared_ptr<Array>* out) { | ||
| void AsDictionary32Encoded(const Array& arr, std::shared_ptr<Array>* out) { | ||
| ::arrow::StringDictionary32Builder builder(default_memory_pool()); | ||
| const auto& string_array = static_cast<const ::arrow::StringArray&>(arr); | ||
| ASSERT_OK(builder.AppendArray(string_array)); | ||
|
|
@@ -2826,7 +2833,7 @@ TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { | |
| std::vector<std::shared_ptr<Array>> chunks(kNumRowGroups); | ||
| const int64_t chunk_size = expected_dense_->num_rows() / kNumRowGroups; | ||
| for (int i = 0; i < kNumRowGroups; ++i) { | ||
| AsDictionaryEncoded(*dense_values_->Slice(chunk_size * i, chunk_size), &chunks[i]); | ||
| AsDictionary32Encoded(*dense_values_->Slice(chunk_size * i, chunk_size), &chunks[i]); | ||
| } | ||
| auto ex_table = MakeSimpleTable(std::make_shared<ChunkedArray>(chunks), | ||
| /*nullable=*/true); | ||
|
|
@@ -2840,8 +2847,88 @@ TEST_P(TestArrowReadDictionary, ReadWholeFileDense) { | |
|
|
||
| INSTANTIATE_TEST_CASE_P( | ||
| ReadDictionary, TestArrowReadDictionary, | ||
| ::testing::ValuesIn(TestArrowReadDictionary::null_probabilites())); | ||
| ::testing::ValuesIn(TestArrowReadDictionary::null_probabilities())); | ||
|
|
||
| TEST(TestArrowWriteDictionaries, ChangingDictionaries) { | ||
| constexpr int num_unique = 50; | ||
| constexpr int repeat = 10000; | ||
| constexpr int64_t min_length = 2; | ||
| constexpr int64_t max_length = 20; | ||
| ::arrow::random::RandomArrayGenerator rag(0); | ||
| auto values = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, | ||
| max_length, /*null_probability=*/0.1); | ||
| auto expected = MakeSimpleTable(values, /*nullable=*/true); | ||
|
|
||
| const int num_chunks = 10; | ||
| std::vector<std::shared_ptr<Array>> chunks(num_chunks); | ||
| const int64_t chunk_size = values->length() / num_chunks; | ||
| for (int i = 0; i < num_chunks; ++i) { | ||
| AsDictionary32Encoded(*values->Slice(chunk_size * i, chunk_size), &chunks[i]); | ||
| } | ||
|
|
||
| auto dict_table = MakeSimpleTable(std::make_shared<ChunkedArray>(chunks), | ||
| /*nullable=*/true); | ||
|
|
||
| std::shared_ptr<Table> actual; | ||
| DoRoundtrip(dict_table, /*row_group_size=*/values->length() / 2, &actual); | ||
| ::arrow::AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false); | ||
| } | ||
|
|
||
| TEST(TestArrowWriteDictionaries, AutoReadAsDictionary) { | ||
| constexpr int num_unique = 50; | ||
| constexpr int repeat = 100; | ||
| constexpr int64_t min_length = 2; | ||
| constexpr int64_t max_length = 20; | ||
| ::arrow::random::RandomArrayGenerator rag(0); | ||
| auto values = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, | ||
| max_length, /*null_probability=*/0.1); | ||
| std::shared_ptr<Array> dict_values; | ||
| AsDictionary32Encoded(*values, &dict_values); | ||
|
|
||
| } // namespace arrow | ||
| auto expected = MakeSimpleTable(dict_values, /*nullable=*/true); | ||
| auto expected_dense = MakeSimpleTable(values, /*nullable=*/true); | ||
|
|
||
| auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); | ||
| std::shared_ptr<Table> actual, actual_dense; | ||
|
|
||
| DoRoundtrip(expected, values->length(), &actual, default_writer_properties(), | ||
| props_store_schema); | ||
| ::arrow::AssertTablesEqual(*expected, *actual); | ||
|
|
||
| auto props_no_store_schema = ArrowWriterProperties::Builder().build(); | ||
| DoRoundtrip(expected, values->length(), &actual_dense, default_writer_properties(), | ||
| props_no_store_schema); | ||
| ::arrow::AssertTablesEqual(*expected_dense, *actual_dense); | ||
| } | ||
|
|
||
| TEST(TestArrowWriteDictionaries, NestedSubfield) { | ||
| // ARROW-3246: Automatic decoding of dictionary subfields left as followup | ||
| // work | ||
| auto offsets = ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 0, 2, 3]"); | ||
| auto indices = ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 0, 0]"); | ||
| auto dict = ::arrow::ArrayFromJSON(::arrow::utf8(), "[\"foo\"]"); | ||
|
|
||
| std::shared_ptr<Array> dict_values, values; | ||
| auto dict_ty = ::arrow::dictionary(::arrow::int32(), ::arrow::utf8()); | ||
| ASSERT_OK(::arrow::DictionaryArray::FromArrays(dict_ty, indices, dict, &dict_values)); | ||
| ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *dict_values, | ||
| ::arrow::default_memory_pool(), &values)); | ||
|
|
||
| auto dense_ty = ::arrow::list(::arrow::utf8()); | ||
| auto dense_values = | ||
| ::arrow::ArrayFromJSON(dense_ty, "[[], [\"foo\", \"foo\"], [\"foo\"]]"); | ||
|
|
||
| auto table = MakeSimpleTable(values, /*nullable=*/true); | ||
| auto expected_table = MakeSimpleTable(dense_values, /*nullable=*/true); | ||
|
|
||
| auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); | ||
| std::shared_ptr<Table> actual; | ||
| DoRoundtrip(table, values->length(), &actual, default_writer_properties(), | ||
| props_store_schema); | ||
|
|
||
| // The nested subfield is not automatically decoded to dictionary | ||
| ::arrow::AssertTablesEqual(*expected_table, *actual); | ||
| } | ||
|
||
|
|
||
| } // namespace arrow | ||
| } // namespace parquet | ||
Uh oh!
There was an error while loading. Please reload this page.