Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,49 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) {
});
}

TEST_F(TestFileSystemDataset, WriteProjected) {
// Regression test for ARROW-12620
auto format = std::make_shared<IpcFileFormat>();
auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.filesystem = fs;
write_options.base_dir = "root";
write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
write_options.basename_template = "{i}.feather";

auto dataset_schema = schema({field("a", int64())});
RecordBatchVector batches{
ConstantArrayGenerator::Zeroes(kRowsPerBatch, dataset_schema)};
ASSERT_EQ(0, batches[0]->column(0)->null_count());
auto dataset = std::make_shared<InMemoryDataset>(dataset_schema, batches);
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK(scanner_builder->Project(
{compute::call("add", {compute::field_ref("a"), compute::literal(1)})},
{"a_plus_one"}));
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

ASSERT_OK(FileSystemDataset::Write(write_options, scanner));

ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
fs, {"root/0.feather"}, format, {}));
ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
auto expected_schema = schema({field("a_plus_one", int64())});
AssertSchemaEqual(*expected_schema, *written_dataset->schema());
ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
auto col = table->column(0);
ASSERT_EQ(0, col->null_count());
for (auto chunk : col->chunks()) {
auto arr = std::dynamic_pointer_cast<Int64Array>(chunk);
for (auto val : *arr) {
ASSERT_TRUE(val.has_value());
ASSERT_EQ(1, *val);
}
}
}

// Tests of subtree pruning

struct TestPathTree {
Expand Down
6 changes: 1 addition & 5 deletions cpp/src/arrow/dataset/scanner_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,7 @@ class FilterAndProjectScanTask : public ScanTask {
ARROW_ASSIGN_OR_RAISE(auto projected, FilterAndProjectBatch(batch));
return visitor(projected);
};
return task_->SafeExecute(executor).Then(
[this, filter_and_project_visitor](const RecordBatchVector& rbs) -> Status {
ARROW_ASSIGN_OR_RAISE(auto projected_it, ToFilteredAndProjectedIterator(rbs));
return projected_it.Visit(filter_and_project_visitor);
});
return task_->SafeVisit(executor, filter_and_project_visitor);
}

private:
Expand Down