From f7448f2086d34cc501469f82f158ac66ac7a47f3 Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 3 May 2021 11:07:06 -0400
Subject: [PATCH] ARROW-12620: [C++][Dataset] Fix projection during writing
---
cpp/src/arrow/dataset/file_test.cc | 43 ++++++++++++++++++++++++
cpp/src/arrow/dataset/scanner_internal.h | 6 +---
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc
index 0c8954e6b7b..839b48a0e64 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -295,6 +295,49 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) {
});
}
+TEST_F(TestFileSystemDataset, WriteProjected) {
+ // Regression test for ARROW-12620
+ auto format = std::make_shared();
+ auto fs = std::make_shared(fs::kNoTime);
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.filesystem = fs;
+ write_options.base_dir = "root";
+ write_options.partitioning = std::make_shared(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ auto dataset_schema = schema({field("a", int64())});
+ RecordBatchVector batches{
+ ConstantArrayGenerator::Zeroes(kRowsPerBatch, dataset_schema)};
+ ASSERT_EQ(0, batches[0]->column(0)->null_count());
+ auto dataset = std::make_shared(dataset_schema, batches);
+ ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+ ASSERT_OK(scanner_builder->Project(
+ {compute::call("add", {compute::field_ref("a"), compute::literal(1)})},
+ {"a_plus_one"}));
+ ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+ ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"}, format, {}));
+ ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
+ auto expected_schema = schema({field("a_plus_one", int64())});
+ AssertSchemaEqual(*expected_schema, *written_dataset->schema());
+ ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+ ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
+ auto col = table->column(0);
+ ASSERT_EQ(0, col->null_count());
+ for (auto chunk : col->chunks()) {
+ auto arr = std::dynamic_pointer_cast(chunk);
+ for (auto val : *arr) {
+ ASSERT_TRUE(val.has_value());
+ ASSERT_EQ(1, *val);
+ }
+ }
+}
+
// Tests of subtree pruning
struct TestPathTree {
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index 56065d9983e..30fb4e07cef 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -169,11 +169,7 @@ class FilterAndProjectScanTask : public ScanTask {
ARROW_ASSIGN_OR_RAISE(auto projected, FilterAndProjectBatch(batch));
return visitor(projected);
};
- return task_->SafeExecute(executor).Then(
- [this, filter_and_project_visitor](const RecordBatchVector& rbs) -> Status {
- ARROW_ASSIGN_OR_RAISE(auto projected_it, ToFilteredAndProjectedIterator(rbs));
- return projected_it.Visit(filter_and_project_visitor);
- });
+ return task_->SafeVisit(executor, filter_and_project_visitor);
}
private: