Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,73 @@ TEST(ExecPlanExecution, SourceGroupedSum) {
}
}

TEST(ExecPlanExecution, NestedSourceFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");

auto input = MakeNestedBatches();
auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])");
auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([
[{"i32": 5, "bool": null}],
[{"i32": 6, "bool": false}],
[{"i32": 7, "bool": false}]
])");

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions{input.schema,
input.gen(parallel, /*slow=*/false)}},
{"filter", FilterNodeOptions{greater_equal(
field_ref(FieldRef("struct", "i32")), literal(5))}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({empty, expected}))));
}
}

TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");

auto input = MakeNestedBatches();
auto expected = ExecBatchFromJSON({int64(), boolean()}, R"([
[null, true],
[17, false],
[5, null]
])");

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

ASSERT_OK(
Declaration::Sequence(
{
{"source",
SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
{"project", ProjectNodeOptions{{
field_ref(FieldRef("struct", "i32")),
field_ref(FieldRef("struct", "bool")),
},
{"i32", "bool"}}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
/*targets=*/{"i32"},
/*names=*/{"sum(i32)"},
/*keys=*/{"bool"}}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({expected}))));
}
}

TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ BatchesWithSchema MakeBasicBatches() {
return out;
}

BatchesWithSchema MakeNestedBatches() {
auto ty = struct_({field("i32", int32()), field("bool", boolean())});
BatchesWithSchema out;
out.batches = {
ExecBatchFromJSON(
{ty},
R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}], [null]])"),
ExecBatchFromJSON(
{ty},
R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}]])")};
out.schema = schema({field("struct", ty)});
return out;
}

BatchesWithSchema MakeRandomBatches(const std::shared_ptr<Schema>& schema,
int num_batches, int batch_size) {
BatchesWithSchema out;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ Future<std::vector<ExecBatch>> StartAndCollect(
ARROW_TESTING_EXPORT
BatchesWithSchema MakeBasicBatches();

ARROW_TESTING_EXPORT
BatchesWithSchema MakeNestedBatches();

ARROW_TESTING_EXPORT
BatchesWithSchema MakeRandomBatches(const std::shared_ptr<Schema>& schema,
int num_batches = 10, int batch_size = 4);
Expand Down
25 changes: 22 additions & 3 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,28 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(

if (!scan_options) return convert_options;

auto materialized = scan_options->MaterializedFields();
std::unordered_set<std::string> materialized_fields(materialized.begin(),
materialized.end());
auto field_refs = scan_options->MaterializedFields();
std::unordered_set<std::string> materialized_fields;
materialized_fields.reserve(field_refs.size());
// Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
// quadratic (and this is significant overhead with 1000+ columns)
for (const auto& ref : field_refs) {
if (const std::string* name = ref.name()) {
// Common case
materialized_fields.emplace(*name);
continue;
}
// Currently CSV reader doesn't support reading any nested types, so this
// path shouldn't be hit. However, implement it in the same way as IPC/ORC:
// load the entire top-level field if a nested field is selected.
ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOneOrNone(*scan_options->dataset_schema));
if (column_names.find(field->name()) == column_names.end()) continue;
// Only read the requested columns
convert_options.include_columns.push_back(field->name());
// Properly set conversion types
convert_options.column_types[field->name()] = field->type();
}

for (auto field : scan_options->dataset_schema->fields()) {
if (materialized_fields.find(field->name()) == materialized_fields.end()) continue;
// Ignore virtual columns.
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/arrow/dataset/file_csv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,20 @@ class TestCsvFileFormatScan : public FileFormatScanMixin<CsvFormatHelper> {};

TEST_P(TestCsvFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestCsvFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
// NOTE(ARROW-14658): TestScanProjectedNested is ignored since CSV
// doesn't have any nested types for us to work with
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
// The CSV reader rejects duplicate columns, so skip
// ScanRecordBatchReaderWithDuplicateColumn
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}

INSTANTIATE_TEST_SUITE_P(TestScan, TestCsvFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsyn
}

static inline Result<std::vector<int>> GetIncludedFields(
const Schema& schema, const std::vector<std::string>& materialized_fields) {
const Schema& schema, const std::vector<FieldRef>& materialized_fields) {
std::vector<int> included_fields;

for (FieldRef ref : materialized_fields) {
for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema));
if (match.indices().empty()) continue;

Expand Down
15 changes: 12 additions & 3 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ class TestIpcFileFormatScan : public FileFormatScanMixin<IpcFormatHelper> {};

TEST_P(TestIpcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestIpcFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
TestScanProjectedNested();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
TestScanWithDuplicateColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}
TEST_P(TestIpcFileFormatScan, FragmentScanOptions) {
auto reader = GetRecordBatchReader(
// ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ class OrcScanTask {
// filter out virtual columns
std::vector<std::string> included_fields;
ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
for (auto name : materialized_fields) {
FieldRef ref(name);
for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema));
if (match.indices().empty()) continue;

included_fields.push_back(name);
included_fields.push_back(schema->field(match.indices()[0])->name());
}

return RecordBatchIterator(
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/arrow/dataset/file_orc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,25 @@ TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }
class TestOrcFileFormatScan : public FileFormatScanMixin<OrcFormatHelper> {};

TEST_P(TestOrcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
TEST_P(TestOrcFileFormatScan, ScanBatchSize) {
// TODO(ARROW-14153): TestScanBatchSize();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
TestScanProjectedNested();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
TestScanWithDuplicateColumn();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}
INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
Expand Down
121 changes: 102 additions & 19 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,112 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}

// Compute the column projection out of an optional arrow::Schema
std::vector<int> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
Status ResolveOneFieldRef(
const SchemaManifest& manifest, const FieldRef& field_ref,
const std::unordered_map<std::string, const SchemaField*>& field_lookup,
const std::unordered_set<std::string>& duplicate_fields,
std::vector<int>* columns_selection) {
if (const std::string* name = field_ref.name()) {
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
AddColumnIndices(*it->second, columns_selection);
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
// We shouldn't generally get here because SetProjection will reject such references
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
}
// "Virtual" column: field is not in file but is in the ScanOptions.
// Ignore it here, as projection will pad the batch with a null column.
return Status::OK();
}

const SchemaField* toplevel = nullptr;
const SchemaField* field = nullptr;
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
for (const auto& ref : *refs) {
if (const std::string* name = ref.name()) {
if (!field) {
// First lookup, top-level field
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
field = it->second;
toplevel = field;
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
} else {
// Virtual column
return Status::OK();
}
} else {
const SchemaField* result = nullptr;
for (const auto& child : field->children) {
if (child.field->name() == *name) {
if (!result) {
result = &child;
} else {
return Status::Invalid("Ambiguous nested reference to column '", *name,
"' which occurs more than once in field ",
field->field->ToString());
}
}
}
if (!result) {
// Virtual column
return Status::OK();
}
field = result;
}
continue;
}
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}
} else {
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}

if (field) {
// TODO(ARROW-1888): support fine-grained column projection. We should be
// able to materialize only the child fields requested, and not the entire
// top-level field.
// Right now, if enabled, projection/filtering will fail when they cast the
// physical schema to the dataset schema.
AddColumnIndices(*toplevel, columns_selection);
}
return Status::OK();
}

// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
auto manifest = reader.manifest();
// Checks if the field is needed in either the projection or the filter.
auto field_names = options.MaterializedFields();
std::unordered_set<std::string> materialized_fields{field_names.cbegin(),
field_names.cend()};
auto should_materialize_column = [&materialized_fields](const std::string& f) {
return materialized_fields.find(f) != materialized_fields.end();
};

std::vector<int> columns_selection;
// Note that the loop is using the file's schema to iterate instead of the
// materialized fields of the ScanOptions. This ensures that missing
// fields in the file (but present in the ScanOptions) will be ignored. The
// scanner's projector will take care of padding the column with the proper
// values.
auto field_refs = options.MaterializedFields();

// Build a lookup table from top level field name to field metadata.
// This is to avoid quadratic-time mapping of projected fields to
// column indices, in the common case of selecting top level
// columns. For nested fields, we will pay the cost of a linear scan
// assuming for now that this is relatively rare, but this can be
// optimized. (Also, we don't want to pay the cost of building all
// the lookup tables up front if they're rarely used.)
std::unordered_map<std::string, const SchemaField*> field_lookup;
std::unordered_set<std::string> duplicate_fields;
for (const auto& schema_field : manifest.schema_fields) {
if (should_materialize_column(schema_field.field->name())) {
AddColumnIndices(schema_field, &columns_selection);
const auto it = field_lookup.emplace(schema_field.field->name(), &schema_field);
if (!it.second) {
duplicate_fields.emplace(schema_field.field->name());
}
}

std::vector<int> columns_selection;
for (const auto& ref : field_refs) {
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
return columns_selection;
}

Expand Down Expand Up @@ -351,7 +433,8 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
parquet_fragment->FilterRowGroups(options->filter));
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
auto column_projection = InferColumnProjection(*reader, *options);
ARROW_ASSIGN_OR_RAISE(auto column_projection,
InferColumnProjection(*reader, *options));
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
Expand Down
Loading