From b12fb6e505ab36cfd1df215818a7bb09df14a799 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 10 Nov 2021 09:38:19 -0500
Subject: [PATCH 1/2] ARROW-14658: [C++] Enable nested field refs in scanning
---
cpp/src/arrow/compute/exec/plan_test.cc | 67 +++++
cpp/src/arrow/compute/exec/test_util.cc | 14 +
cpp/src/arrow/compute/exec/test_util.h | 3 +
cpp/src/arrow/dataset/file_csv.cc | 25 +-
cpp/src/arrow/dataset/file_csv_test.cc | 14 +-
cpp/src/arrow/dataset/file_ipc.cc | 4 +-
cpp/src/arrow/dataset/file_ipc_test.cc | 15 +-
cpp/src/arrow/dataset/file_orc.cc | 5 +-
cpp/src/arrow/dataset/file_orc_test.cc | 16 +-
cpp/src/arrow/dataset/file_parquet.cc | 121 +++++++--
cpp/src/arrow/dataset/file_parquet_test.cc | 13 +
cpp/src/arrow/dataset/scanner.cc | 22 +-
cpp/src/arrow/dataset/scanner.h | 30 +--
cpp/src/arrow/dataset/scanner_test.cc | 299 +++++++++++++++++----
cpp/src/arrow/dataset/test_util.h | 153 ++++++++++-
cpp/src/arrow/testing/generator.cc | 9 +
cpp/src/arrow/testing/matchers.h | 4 +
cpp/src/arrow/type.h | 5 +
18 files changed, 695 insertions(+), 124 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index 745c6b256eb..258238dbb81 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -793,6 +793,73 @@ TEST(ExecPlanExecution, SourceGroupedSum) {
}
}
+TEST(ExecPlanExecution, NestedSourceFilter) {
+ for (bool parallel : {false, true}) {
+ SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+ auto input = MakeNestedBatches();
+ auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])");
+ auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([
+ [{"i32": 5, "bool": null}],
+ [{"i32": 6, "bool": false}],
+ [{"i32": 7, "bool": false}]
+])");
+
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator> sink_gen;
+
+ ASSERT_OK(Declaration::Sequence(
+ {
+ {"source", SourceNodeOptions{input.schema,
+ input.gen(parallel, /*slow=*/false)}},
+ {"filter", FilterNodeOptions{greater_equal(
+ field_ref(FieldRef("struct", "i32")), literal(5))}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({empty, expected}))));
+ }
+}
+
+TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) {
+ for (bool parallel : {false, true}) {
+ SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
+
+ auto input = MakeNestedBatches();
+ auto expected = ExecBatchFromJSON({int64(), boolean()}, R"([
+ [null, true],
+ [17, false],
+ [5, null]
+])");
+
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator> sink_gen;
+
+ ASSERT_OK(
+ Declaration::Sequence(
+ {
+ {"source",
+ SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
+ {"project", ProjectNodeOptions{{
+ field_ref(FieldRef("struct", "i32")),
+ field_ref(FieldRef("struct", "bool")),
+ },
+ {"i32", "bool"}}},
+ {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
+ /*targets=*/{"i32"},
+ /*names=*/{"sum(i32)"},
+ /*keys=*/{"bool"}}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+ Finishes(ResultWith(UnorderedElementsAreArray({expected}))));
+ }
+}
+
TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc
index 64f3ec997c9..7733ea46084 100644
--- a/cpp/src/arrow/compute/exec/test_util.cc
+++ b/cpp/src/arrow/compute/exec/test_util.cc
@@ -190,6 +190,20 @@ BatchesWithSchema MakeBasicBatches() {
return out;
}
+BatchesWithSchema MakeNestedBatches() {
+ auto ty = struct_({field("i32", int32()), field("bool", boolean())});
+ BatchesWithSchema out;
+ out.batches = {
+ ExecBatchFromJSON(
+ {ty},
+ R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}], [null]])"),
+ ExecBatchFromJSON(
+ {ty},
+ R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}]])")};
+ out.schema = schema({field("struct", ty)});
+ return out;
+}
+
BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema,
int num_batches, int batch_size) {
BatchesWithSchema out;
diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h
index a05d3b664ee..4a46ca95839 100644
--- a/cpp/src/arrow/compute/exec/test_util.h
+++ b/cpp/src/arrow/compute/exec/test_util.h
@@ -87,6 +87,9 @@ Future> StartAndCollect(
ARROW_TESTING_EXPORT
BatchesWithSchema MakeBasicBatches();
+ARROW_TESTING_EXPORT
+BatchesWithSchema MakeNestedBatches();
+
ARROW_TESTING_EXPORT
BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema,
int num_batches = 10, int batch_size = 4);
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index 554046ce603..1cc7957083f 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -116,9 +116,28 @@ static inline Result GetConvertOptions(
if (!scan_options) return convert_options;
- auto materialized = scan_options->MaterializedFields();
- std::unordered_set materialized_fields(materialized.begin(),
- materialized.end());
+ auto field_refs = scan_options->MaterializedFields();
+ std::unordered_set materialized_fields;
+ materialized_fields.reserve(field_refs.size());
+ // Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
+ // quadratic (and this is significant overhead with 1000+ columns)
+ for (const auto& ref : field_refs) {
+ if (const std::string* name = ref.name()) {
+ // Common case
+ materialized_fields.emplace(*name);
+ continue;
+ }
+ // Currently CSV reader doesn't support reading any nested types, so this
+ // path shouldn't be hit. However, implement it in the same way as IPC/ORC:
+ // load the entire top-level field if a nested field is selected.
+ ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOneOrNone(*scan_options->dataset_schema));
+ if (column_names.find(field->name()) == column_names.end()) continue;
+ // Only read the requested columns
+ convert_options.include_columns.push_back(field->name());
+ // Properly set conversion types
+ convert_options.column_types[field->name()] = field->type();
+ }
+
for (auto field : scan_options->dataset_schema->fields()) {
if (materialized_fields.find(field->name()) == materialized_fields.end()) continue;
// Ignore virtual columns.
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 04224236957..1ab88e48b35 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -384,13 +384,21 @@ class TestCsvFileFormatScan : public FileFormatScanMixin {};
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestCsvFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
-TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
- TestScanWithVirtualColumn();
-}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
+// NOTE(ARROW-14658): TestScanProjectedNested is ignored since CSV
+// doesn't have any nested types for us to work with
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
+TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
+ TestScanWithVirtualColumn();
+}
+TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
+ // The CSV reader rejects duplicate columns
+}
+TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
+ TestScanWithDuplicateColumnError();
+}
INSTANTIATE_TEST_SUITE_P(TestScan, TestCsvFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index e8e4a90cf06..e386c7dea8d 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -75,10 +75,10 @@ static inline Future> OpenReaderAsyn
}
static inline Result> GetIncludedFields(
- const Schema& schema, const std::vector& materialized_fields) {
+ const Schema& schema, const std::vector& materialized_fields) {
std::vector included_fields;
- for (FieldRef ref : materialized_fields) {
+ for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema));
if (match.indices().empty()) continue;
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 6a064f539d9..7422ee9f527 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -135,13 +135,22 @@ class TestIpcFileFormatScan : public FileFormatScanMixin {};
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestIpcFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
-TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
- TestScanWithVirtualColumn();
-}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
+TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
+ TestScanProjectedNested();
+}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
+TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
+ TestScanWithVirtualColumn();
+}
+TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
+ TestScanWithDuplicateColumn();
+}
+TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
+ TestScanWithDuplicateColumnError();
+}
TEST_P(TestIpcFileFormatScan, FragmentScanOptions) {
auto reader = GetRecordBatchReader(
// ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc
index d316e1e0fd5..632e41eff1f 100644
--- a/cpp/src/arrow/dataset/file_orc.cc
+++ b/cpp/src/arrow/dataset/file_orc.cc
@@ -79,12 +79,11 @@ class OrcScanTask {
// filter out virtual columns
std::vector included_fields;
ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
- for (auto name : materialized_fields) {
- FieldRef ref(name);
+ for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema));
if (match.indices().empty()) continue;
- included_fields.push_back(name);
+ included_fields.push_back(schema->field(match.indices()[0])->name());
}
return RecordBatchIterator(
diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc
index 546879bff42..4d871602f79 100644
--- a/cpp/src/arrow/dataset/file_orc_test.cc
+++ b/cpp/src/arrow/dataset/file_orc_test.cc
@@ -69,13 +69,25 @@ TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }
class TestOrcFileFormatScan : public FileFormatScanMixin {};
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
-TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
- TestScanWithVirtualColumn();
+TEST_P(TestOrcFileFormatScan, ScanBatchSize) {
+ // TODO(ARROW-14153): TestScanBatchSize();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
+TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
+ TestScanProjectedNested();
+}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
+TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
+ TestScanWithVirtualColumn();
+}
+TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
+ TestScanWithDuplicateColumn();
+}
+TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
+ TestScanWithDuplicateColumnError();
+}
INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index cb6a7484fdb..cdee1f00684 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -155,30 +155,112 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}
-// Compute the column projection out of an optional arrow::Schema
-std::vector InferColumnProjection(const parquet::arrow::FileReader& reader,
- const ScanOptions& options) {
+Status ResolveOneFieldRef(
+ const SchemaManifest& manifest, const FieldRef& field_ref,
+ const std::unordered_map& field_lookup,
+ const std::unordered_set& duplicate_fields,
+ std::vector* columns_selection) {
+ if (const std::string* name = field_ref.name()) {
+ auto it = field_lookup.find(*name);
+ if (it != field_lookup.end()) {
+ AddColumnIndices(*it->second, columns_selection);
+ } else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
+ // We shouldn't generally get here because SetProjection will reject such references
+ return Status::Invalid("Ambiguous reference to column '", *name,
+ "' which occurs more than once");
+ }
+ // "Virtual" column: field is not in file but is in the ScanOptions.
+ // Ignore it here, as projection will pad the batch with a null column.
+ return Status::OK();
+ }
+
+ const SchemaField* toplevel = nullptr;
+ const SchemaField* field = nullptr;
+ if (const std::vector* refs = field_ref.nested_refs()) {
+ // Only supports a sequence of names
+ for (const auto& ref : *refs) {
+ if (const std::string* name = ref.name()) {
+ if (!field) {
+ // First lookup, top-level field
+ auto it = field_lookup.find(*name);
+ if (it != field_lookup.end()) {
+ field = it->second;
+ toplevel = field;
+ } else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
+ return Status::Invalid("Ambiguous reference to column '", *name,
+ "' which occurs more than once");
+ } else {
+ // Virtual column
+ return Status::OK();
+ }
+ } else {
+ const SchemaField* result = nullptr;
+ for (const auto& child : field->children) {
+ if (child.field->name() == *name) {
+ if (!result) {
+ result = &child;
+ } else {
+ return Status::Invalid("Ambiguous nested reference to column '", *name,
+ "' which occurs more than once in field ",
+ field->field->ToString());
+ }
+ }
+ }
+ if (!result) {
+ // Virtual column
+ return Status::OK();
+ }
+ field = result;
+ }
+ continue;
+ }
+ return Status::NotImplemented("Inferring column projection from FieldRef ",
+ field_ref.ToString());
+ }
+ } else {
+ return Status::NotImplemented("Inferring column projection from FieldRef ",
+ field_ref.ToString());
+ }
+
+ if (field) {
+ // TODO(ARROW-1888): support fine-grained column projection. We should be
+ // able to materialize only the child fields requested, and not the entire
+ // top-level field.
+ // Right now, if enabled, projection/filtering will fail when they cast the
+ // physical schema to the dataset schema.
+ AddColumnIndices(*toplevel, columns_selection);
+ }
+ return Status::OK();
+}
+
+// Compute the column projection based on the scan options
+Result> InferColumnProjection(const parquet::arrow::FileReader& reader,
+ const ScanOptions& options) {
auto manifest = reader.manifest();
// Checks if the field is needed in either the projection or the filter.
- auto field_names = options.MaterializedFields();
- std::unordered_set materialized_fields{field_names.cbegin(),
- field_names.cend()};
- auto should_materialize_column = [&materialized_fields](const std::string& f) {
- return materialized_fields.find(f) != materialized_fields.end();
- };
-
- std::vector columns_selection;
- // Note that the loop is using the file's schema to iterate instead of the
- // materialized fields of the ScanOptions. This ensures that missing
- // fields in the file (but present in the ScanOptions) will be ignored. The
- // scanner's projector will take care of padding the column with the proper
- // values.
+ auto field_refs = options.MaterializedFields();
+
+ // Build a lookup table from top level field name to field metadata.
+ // This is to avoid quadratic-time mapping of projected fields to
+ // column indices, in the common case of selecting top level
+ // columns. For nested fields, we will pay the cost of a linear scan
+ // assuming for now that this is relatively rare, but this can be
+ // optimized. (Also, we don't want to pay the cost of building all
+ // the lookup tables up front if they're rarely used.)
+ std::unordered_map field_lookup;
+ std::unordered_set duplicate_fields;
for (const auto& schema_field : manifest.schema_fields) {
- if (should_materialize_column(schema_field.field->name())) {
- AddColumnIndices(schema_field, &columns_selection);
+ const auto it = field_lookup.emplace(schema_field.field->name(), &schema_field);
+ if (!it.second) {
+ duplicate_fields.emplace(schema_field.field->name());
}
}
+ std::vector columns_selection;
+ for (const auto& ref : field_refs) {
+ RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
+ &columns_selection));
+ }
return columns_selection;
}
@@ -351,7 +433,8 @@ Result ParquetFileFormat::ScanBatchesAsync(
parquet_fragment->FilterRowGroups(options->filter));
if (row_groups.empty()) return MakeEmptyGenerator>();
}
- auto column_projection = InferColumnProjection(*reader, *options);
+ ARROW_ASSIGN_OR_RAISE(auto column_projection,
+ InferColumnProjection(*reader, *options));
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions(
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 090737ad062..d5c7a0b9850 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -377,9 +377,22 @@ class TestParquetFileFormatScan : public FileFormatScanMixin>()>;
-std::vector ScanOptions::MaterializedFields() const {
- std::vector fields;
+std::vector ScanOptions::MaterializedFields() const {
+ std::vector fields;
for (const compute::Expression* expr : {&filter, &projection}) {
- for (const FieldRef& ref : FieldsInExpression(*expr)) {
- DCHECK(ref.name());
- fields.push_back(*ref.name());
- }
+ auto refs = FieldsInExpression(*expr);
+ fields.insert(fields.end(), std::make_move_iterator(refs.begin()),
+ std::make_move_iterator(refs.end()));
}
return fields;
@@ -591,13 +590,6 @@ Result> AsyncScanner::ToRecordBatchReader() {
}
const std::shared_ptr& AsyncScanner::dataset() const { return dataset_; }
-
-Status NestedFieldRefsNotImplemented() {
- // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that
- // only top level fields will be materialized.
- return Status::NotImplemented("Nested field references in scans.");
-}
-
} // namespace
Result ProjectionDescr::FromStructExpression(
@@ -623,8 +615,6 @@ Result ProjectionDescr::FromExpressions(
for (size_t i = 0; i < exprs.size(); ++i) {
if (auto ref = exprs[i].field_ref()) {
- if (!ref->name()) return NestedFieldRefsNotImplemented();
-
// set metadata and nullability for plain field references
ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(dataset_schema));
project_options.field_nullability[i] = field->nullable();
@@ -708,8 +698,6 @@ Status ScannerBuilder::Project(std::vector exprs,
Status ScannerBuilder::Filter(const compute::Expression& filter) {
for (const auto& ref : FieldsInExpression(filter)) {
- if (!ref.name()) return NestedFieldRefsNotImplemented();
-
RETURN_NOT_OK(ref.FindOne(*scan_options_->dataset_schema));
}
ARROW_ASSIGN_OR_RAISE(scan_options_->filter,
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 738915a8129..013ba092b0c 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -118,21 +118,21 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Fragment-specific scan options.
std::shared_ptr fragment_scan_options;
- // Return a vector of fields that requires materialization.
- //
- // This is usually the union of the fields referenced in the projection and the
- // filter expression. Examples:
- //
- // - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
- // - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
- //
- // This is needed for expression where a field may not be directly
- // used in the final projection but is still required to evaluate the
- // expression.
- //
- // This is used by Fragment implementations to apply the column
- // sub-selection optimization.
- std::vector MaterializedFields() const;
+ /// Return a vector of FieldRefs that require materialization.
+ ///
+ /// This is usually the union of the fields referenced in the projection and the
+ /// filter expression. Examples:
+ ///
+ /// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
+ /// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
+ ///
+ /// This is needed for expression where a field may not be directly
+ /// used in the final projection but is still required to evaluate the
+ /// expression.
+ ///
+ /// This is used by Fragment implementations to apply the column
+ /// sub-selection optimization.
+ std::vector MaterializedFields() const;
};
/// \brief Describes a projection
diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index c44849c3402..e62c626d9f8 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -197,6 +197,46 @@ TEST_P(TestScanner, FilteredScan) {
AssertScanBatchesEqualRepetitionsOf(MakeScanner(batch), filtered_batch);
}
+TEST_P(TestScanner, FilteredScanNested) {
+ auto struct_ty = struct_({field("f64", float64())});
+ SetSchema({field("struct", struct_ty)});
+
+ double value = 0.5;
+ ASSERT_OK_AND_ASSIGN(auto f64,
+ ArrayFromBuilderVisitor(float64(), GetParam().items_per_batch,
+ GetParam().items_per_batch / 2,
+ [&](DoubleBuilder* builder) {
+ builder->UnsafeAppend(value);
+ builder->UnsafeAppend(-value);
+ value += 1.0;
+ }));
+
+ SetFilter(greater(field_ref(FieldRef("struct", "f64")), literal(0.0)));
+
+ auto batch = RecordBatch::Make(
+ schema_, f64->length(),
+ {
+ std::make_shared(struct_ty, f64->length(), ArrayVector{f64}),
+ });
+
+ value = 0.5;
+ ASSERT_OK_AND_ASSIGN(auto f64_filtered,
+ ArrayFromBuilderVisitor(float64(), GetParam().items_per_batch / 2,
+ [&](DoubleBuilder* builder) {
+ builder->UnsafeAppend(value);
+ value += 1.0;
+ }));
+
+ auto filtered_batch = RecordBatch::Make(
+ schema_, f64_filtered->length(),
+ {
+ std::make_shared(struct_ty, f64_filtered->length(),
+ ArrayVector{f64_filtered}),
+ });
+
+ AssertScanBatchesEqualRepetitionsOf(MakeScanner(batch), filtered_batch);
+}
+
TEST_P(TestScanner, ProjectedScan) {
SetSchema({field("i32", int32()), field("f64", float64())});
SetProjectedColumns({"i32"});
@@ -206,6 +246,25 @@ TEST_P(TestScanner, ProjectedScan) {
AssertScanBatchesUnorderedEqualRepetitionsOf(MakeScanner(batch_in), batch_out);
}
+TEST_P(TestScanner, ProjectedScanNested) {
+ SetSchema({
+ field("struct", struct_({field("i32", int32()), field("f64", float64())})),
+ field("nested", struct_({field("left", int32()),
+ field("right", struct_({field("i32", int32()),
+ field("f64", float64())}))})),
+ });
+ ASSERT_OK_AND_ASSIGN(auto descr, ProjectionDescr::FromExpressions(
+ {field_ref(FieldRef("struct", "i32")),
+ field_ref(FieldRef("nested", "right", "f64"))},
+ {"i32", "f64"}, *options_->dataset_schema))
+ SetProjection(options_.get(), std::move(descr));
+ auto batch_in = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_);
+ auto batch_out = ConstantArrayGenerator::Zeroes(
+ GetParam().items_per_batch,
+ schema({field("i32", int32()), field("f64", float64())}));
+ AssertScanBatchesUnorderedEqualRepetitionsOf(MakeScanner(batch_in), batch_out);
+}
+
TEST_P(TestScanner, MaterializeMissingColumn) {
SetSchema({field("i32", int32()), field("f64", float64())});
auto batch_missing_f64 = ConstantArrayGenerator::Zeroes(
@@ -1062,6 +1121,7 @@ class TestScannerBuilder : public ::testing::Test {
field("i16", int16()),
field("i32", int32()),
field("i64", int64()),
+ field("nested", struct_({field("str", utf8())})),
});
ASSERT_OK_AND_ASSIGN(dataset_, UnionDataset::Make(schema_, sources));
@@ -1084,6 +1144,7 @@ TEST_F(TestScannerBuilder, TestProject) {
ASSERT_OK(builder.Project(
{field_ref("i16"), call("multiply", {field_ref("i16"), literal(2)})},
{"i16 renamed", "i16 * 2"}));
+ ASSERT_OK(builder.Project({field_ref(FieldRef("nested", "str"))}, {".nested.str"}));
ASSERT_RAISES(Invalid, builder.Project({"not_found_column"}));
ASSERT_RAISES(Invalid, builder.Project({"i8", "not_found_column"}));
@@ -1092,8 +1153,8 @@ TEST_F(TestScannerBuilder, TestProject) {
call("multiply", {field_ref("i16"), literal(2)})},
{"i16 renamed", "i16 * 2"}));
- ASSERT_RAISES(NotImplemented, builder.Project({field_ref(FieldRef("nested", "column"))},
- {"nested column"}));
+ ASSERT_RAISES(Invalid, builder.Project({field_ref(FieldRef("nested", "not_a_column"))},
+ {"nested column"}));
// provided more field names than column exprs or vice versa
ASSERT_RAISES(Invalid, builder.Project({}, {"i16 renamed", "i16 * 2"}));
@@ -1107,14 +1168,15 @@ TEST_F(TestScannerBuilder, TestFilter) {
ASSERT_OK(builder.Filter(equal(field_ref("i64"), literal(10))));
ASSERT_OK(builder.Filter(or_(equal(field_ref("i64"), literal(10)),
equal(field_ref("b"), literal(true)))));
+ ASSERT_OK(builder.Filter(equal(field_ref(FieldRef("nested", "str")), literal(""))));
ASSERT_OK(builder.Filter(equal(field_ref("i64"), literal(10))));
ASSERT_RAISES(Invalid, builder.Filter(equal(field_ref("not_a_column"), literal(true))));
- ASSERT_RAISES(
- NotImplemented,
- builder.Filter(equal(field_ref(FieldRef("nested", "column")), literal(true))));
+ ASSERT_RAISES(Invalid,
+ builder.Filter(
+ equal(field_ref(FieldRef("nested", "not_a_column")), literal(true))));
ASSERT_RAISES(Invalid,
builder.Filter(or_(equal(field_ref("i64"), literal(10)),
@@ -1143,7 +1205,7 @@ TEST(ScanOptions, TestMaterializedFields) {
// project nothing, filter on i32 = materialize i32
opts->filter = equal(field_ref("i32"), literal(10));
- EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32"));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i32")));
// project i32 & i64, filter nothing = materialize i32 & i64
opts->filter = literal(true);
@@ -1165,15 +1227,50 @@ TEST(ScanOptions, TestMaterializedFields) {
// project i32, filter on i32 = materialize i32 (reported twice)
opts->filter = equal(field_ref("i32"), literal(10));
- EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i32"));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i32"), FieldRef("i32")));
// project i32, filter on i32 & i64 = materialize i64, i32 (reported twice)
opts->filter = less(field_ref("i32"), field_ref("i64"));
- EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64", "i32"));
+ EXPECT_THAT(opts->MaterializedFields(),
+ ElementsAre(FieldRef("i32"), FieldRef("i64"), FieldRef("i32")));
// project i32, filter on i64 = materialize i32 & i64
opts->filter = equal(field_ref("i64"), literal(10));
- EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i64", "i32"));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i64"), FieldRef("i32")));
+
+ auto nested = field("nested", struct_({i32, i64}));
+ opts->dataset_schema = schema({nested});
+
+ // project top-level field, filter nothing
+ opts->filter = literal(true);
+ ASSERT_OK_AND_ASSIGN(projection,
+ ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema));
+ SetProjection(opts.get(), std::move(projection));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested")));
+
+ // project child field, filter nothing
+ opts->filter = literal(true);
+ ASSERT_OK_AND_ASSIGN(projection, ProjectionDescr::FromExpressions(
+ {field_ref(FieldRef("nested", "i64"))},
+ {"nested.i64"}, *opts->dataset_schema));
+ SetProjection(opts.get(), std::move(projection));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested", "i64")));
+
+ // project nothing, filter child field
+ opts->filter = equal(field_ref(FieldRef("nested", "i64")), literal(10));
+ ASSERT_OK_AND_ASSIGN(projection,
+ ProjectionDescr::FromExpressions({}, {}, *opts->dataset_schema));
+ SetProjection(opts.get(), std::move(projection));
+ EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested", "i64")));
+
+ // project child field, filter child field
+ opts->filter = equal(field_ref(FieldRef("nested", "i64")), literal(10));
+ ASSERT_OK_AND_ASSIGN(projection, ProjectionDescr::FromExpressions(
+ {field_ref(FieldRef("nested", "i32"))},
+ {"nested.i32"}, *opts->dataset_schema));
+ SetProjection(opts.get(), std::move(projection));
+ EXPECT_THAT(opts->MaterializedFields(),
+ ElementsAre(FieldRef("nested", "i64"), FieldRef("nested", "i32")));
}
namespace {
@@ -1209,66 +1306,137 @@ struct DatasetAndBatches {
std::vector batches;
};
-DatasetAndBatches MakeBasicDataset() {
- const auto dataset_schema = ::arrow::schema({
- field("a", int32()),
- field("b", boolean()),
- field("c", int32()),
- });
-
- const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});
-
- RecordBatchVector record_batches{
- RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null},
- {"a": 2, "b": true}])"),
- RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true},
- {"a": 3, "b": false}])"),
- RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true},
- {"a": 4, "b": false}])"),
- RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null},
- {"a": 6, "b": false},
- {"a": 7, "b": false}])"),
- };
-
- auto dataset = std::make_shared(
- dataset_schema,
- FragmentVector{
- std::make_shared(
- physical_schema, RecordBatchVector{record_batches[0], record_batches[1]},
- equal(field_ref("c"), literal(23))),
- std::make_shared(
- physical_schema, RecordBatchVector{record_batches[2], record_batches[3]},
- equal(field_ref("c"), literal(47))),
- });
+DatasetAndBatches DatasetAndBatchesFromJSON(
+ const std::shared_ptr& dataset_schema,
+ const std::shared_ptr& physical_schema,
+ const std::vector>& fragment_batch_strs,
+ const std::vector& guarantees,
+ std::function make_exec_batch = {}) {
+ if (!guarantees.empty()) {
+ EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
+ }
+ RecordBatchVector record_batches;
+ FragmentVector fragments;
+ fragments.reserve(fragment_batch_strs.size());
+ for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
+ const auto& batch_strs = fragment_batch_strs[i];
+ RecordBatchVector fragment_batches;
+ fragment_batches.reserve(batch_strs.size());
+ for (const auto& batch_str : batch_strs) {
+ fragment_batches.push_back(RecordBatchFromJSON(physical_schema, batch_str));
+ }
+ record_batches.insert(record_batches.end(), fragment_batches.begin(),
+ fragment_batches.end());
+ fragments.push_back(std::make_shared(
+ physical_schema, std::move(fragment_batches),
+ guarantees.empty() ? literal(true) : guarantees[i]));
+ }
std::vector batches;
-
auto batch_it = record_batches.begin();
- for (int fragment_index = 0; fragment_index < 2; ++fragment_index) {
- for (int batch_index = 0; batch_index < 2; ++batch_index) {
+ for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
+ ++fragment_index) {
+ for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size();
+ ++batch_index) {
const auto& batch = *batch_it++;
// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);
- // a placeholder will be inserted for partition field "c"
- batches.back().values.emplace_back(std::make_shared());
+ // allow customizing the ExecBatch (e.g. to fill in placeholders for partition
+ // fields)
+ if (make_exec_batch) {
+ make_exec_batch(&batches.back(), *batch);
+ }
// scanned batches will be augmented with fragment and batch indices
- batches.back().values.emplace_back(fragment_index);
- batches.back().values.emplace_back(batch_index);
+ batches.back().values.emplace_back(static_cast(fragment_index));
+ batches.back().values.emplace_back(static_cast(batch_index));
// ... and with the last-in-fragment flag
- batches.back().values.emplace_back(batch_index == 1);
+ batches.back().values.emplace_back(batch_index ==
+ fragment_batch_strs[fragment_index].size() - 1);
- // each batch carries a guarantee inherited from its Fragment's partition
- // expression
- batches.back().guarantee =
- equal(field_ref("c"), literal(fragment_index == 0 ? 23 : 47));
+ // each batch carries a guarantee inherited from its Fragment's partition expression
+ batches.back().guarantee = fragments[fragment_index]->partition_expression();
}
}
- return {dataset, batches};
+ auto dataset = std::make_shared(dataset_schema, std::move(fragments));
+ return {std::move(dataset), std::move(batches)};
+}
+
+DatasetAndBatches MakeBasicDataset() {
+ const auto dataset_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", int32()),
+ });
+
+ const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});
+
+ return DatasetAndBatchesFromJSON(
+ dataset_schema, physical_schema,
+ {
+ {
+ R"([{"a": 1, "b": null},
+ {"a": 2, "b": true}])",
+ R"([{"a": null, "b": true},
+ {"a": 3, "b": false}])",
+ },
+ {
+ R"([{"a": null, "b": true},
+ {"a": 4, "b": false}])",
+ R"([{"a": 5, "b": null},
+ {"a": 6, "b": false},
+ {"a": 7, "b": false}])",
+ },
+ },
+ {
+ equal(field_ref("c"), literal(23)),
+ equal(field_ref("c"), literal(47)),
+ },
+ [](compute::ExecBatch* batch, const RecordBatch&) {
+ // a placeholder will be inserted for partition field "c"
+ batch->values.emplace_back(std::make_shared());
+ });
+}
+
+DatasetAndBatches MakeNestedDataset() {
+ const auto dataset_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("d", int64()),
+ field("e", float64()),
+ })),
+ });
+ const auto physical_schema = ::arrow::schema({
+ field("a", int32()),
+ field("b", boolean()),
+ field("c", struct_({
+ field("e", int64()),
+ })),
+ });
+
+ return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
+ {
+ {
+ R"([{"a": 1, "b": null, "c": {"e": 0}},
+ {"a": 2, "b": true, "c": {"e": 1}}])",
+ R"([{"a": null, "b": true, "c": {"e": 2}},
+ {"a": 3, "b": false, "c": {"e": null}}])",
+ R"([{"a": null, "b": null, "c": null}])",
+ },
+ {
+ R"([{"a": null, "b": true, "c": {"e": 4}},
+ {"a": 4, "b": false, "c": null}])",
+ R"([{"a": 5, "b": null, "c": {"e": 6}},
+ {"a": 6, "b": false, "c": {"e": 7}},
+ {"a": 7, "b": false, "c": {"e": null}}])",
+ },
+ },
+ /*guarantees=*/{});
}
compute::Expression Materialize(std::vector names,
@@ -1438,6 +1606,31 @@ TEST(ScanNode, MaterializationOfVirtualColumn) {
ASSERT_THAT(plan.Run(), Finishes(ResultWith(UnorderedElementsAreArray(expected))));
}
+TEST(ScanNode, MaterializationOfNestedVirtualColumn) {
+ TestPlan plan;
+
+ auto basic = MakeNestedDataset();
+
+ auto options = std::make_shared();
+ options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true);
+
+ ASSERT_OK(compute::Declaration::Sequence(
+ {
+ {"scan", ScanNodeOptions{basic.dataset, options}},
+ {"augmented_project",
+ compute::ProjectNodeOptions{
+ {field_ref("a"), field_ref("b"), field_ref("c")}}},
+ {"sink", compute::SinkNodeOptions{&plan.sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ // TODO(ARROW-1888): allow scanner to "patch up" structs with casts
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ NotImplemented,
+ ::testing::HasSubstr("Unsupported cast from struct to struct"),
+ plan.Run());
+}
+
TEST(ScanNode, MinimalEndToEnd) {
// NB: This test is here for didactic purposes
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 73171618ac1..5128021bf65 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -45,6 +45,7 @@
#include "arrow/testing/future_util.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
@@ -410,6 +411,18 @@ class FileFormatFixtureMixin : public ::testing::Test {
SetProjection(opts_.get(), std::move(projection));
}
+ void ProjectNested(std::vector names) {
+ std::vector exprs;
+ for (const auto& name : names) {
+ ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
+ exprs.push_back(field_ref(ref));
+ }
+ ASSERT_OK_AND_ASSIGN(
+ auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
+ *opts_->dataset_schema));
+ SetProjection(opts_.get(), std::move(descr));
+ }
+
// Shared test cases
void AssertInspectFailure(const std::string& contents, StatusCode code,
const std::string& format_name) {
@@ -630,12 +643,110 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
- AssertSchemaEqual(*batch->schema(), *expected_schema,
- /*check_metadata=*/false);
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields()))
+ << "EXPECTED:\n"
+ << expected_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
}
+ void TestScanProjectedNested(bool fine_grained_selection = false) {
+ auto f32 = field("f32", float32());
+ auto f64 = field("f64", float64());
+ auto i32 = field("i32", int32());
+ auto i64 = field("i64", int64());
+ auto struct1 = field("struct1", struct_({f32, i32}));
+ auto struct2 = field("struct2", struct_({f64, i64, struct1}));
+ this->SetSchema({struct1, struct2, f32, f64, i32, i64});
+ this->ProjectNested({".struct1.f32", ".struct2.struct1", ".struct2.struct1.f32"});
+ this->SetFilter(greater_equal(field_ref(FieldRef("struct2", "i64")), literal(0)));
+
+ std::shared_ptr physical_schema;
+ if (fine_grained_selection) {
+ // Some formats, like Parquet, let you pluck only a part of a complex type
+ physical_schema = schema(
+ {field("struct1", struct_({f32})), field("struct2", struct_({i64, struct1}))});
+ } else {
+ // Otherwise, the entire top-level field is returned
+ physical_schema = schema({struct1, struct2});
+ }
+ std::shared_ptr projected_schema = schema({
+ field(".struct1.f32", float32()),
+ field(".struct2.struct1", struct1->type()),
+ field(".struct2.struct1.f32", float32()),
+ });
+
+ {
+ auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ int64_t row_count = 0;
+ for (auto maybe_batch : PhysicalBatches(fragment)) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ row_count += batch->num_rows();
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields()))
+ << "EXPECTED:\n"
+ << physical_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
+ }
+ ASSERT_EQ(row_count, expected_rows());
+ }
+ {
+ auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ int64_t row_count = 0;
+ for (auto maybe_batch : Batches(fragment)) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ row_count += batch->num_rows();
+ AssertSchemaEqual(*batch->schema(), *projected_schema, /*check_metadata=*/false);
+ }
+ ASSERT_LE(row_count, expected_rows());
+ ASSERT_GT(row_count, 0);
+ }
+ {
+ // File includes a duplicated name in struct2
+ auto struct2_physical = field("struct2", struct_({f64, i64, struct1, i64}));
+ auto reader = this->GetRecordBatchReader(
+ schema({struct1, struct2_physical, f32, f64, i32, i64}));
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ auto iterator = PhysicalBatches(fragment);
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("i64"),
+ iterator.Next().status());
+ }
+ {
+ // File is missing a child in struct1
+ auto struct1_physical = field("struct1", struct_({i32}));
+ auto reader = this->GetRecordBatchReader(
+ schema({struct1_physical, struct2, f32, f64, i32, i64}));
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ physical_schema = schema({physical_schema->field(1)});
+
+ int64_t row_count = 0;
+ for (auto maybe_batch : PhysicalBatches(fragment)) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ row_count += batch->num_rows();
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields()))
+ << "EXPECTED:\n"
+ << physical_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
+ }
+ ASSERT_EQ(row_count, expected_rows());
+ }
+ }
void TestScanProjectedMissingCols() {
auto f32 = field("f32", float32());
auto f64 = field("f64", float64());
@@ -674,8 +785,12 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
- AssertSchemaEqual(*batch->schema(), *expected_schema,
- /*check_metadata=*/false);
+ ASSERT_THAT(
+ batch->schema()->fields(),
+ ::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields()))
+ << "EXPECTED:\n"
+ << expected_schema->ToString() << "\nACTUAL:\n"
+ << batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
}
@@ -708,6 +823,36 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
ASSERT_EQ(row_count, expected_rows());
}
}
+ void TestScanWithDuplicateColumn() {
+ // A duplicate column is ignored if not requested.
+ auto i32 = field("i32", int32());
+ auto i64 = field("i64", int64());
+ this->opts_->dataset_schema = schema({i32, i32, i64});
+ this->Project({"i64"});
+ auto expected_schema = schema({i64});
+ auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
+ auto source = this->GetFileSource(reader.get());
+ auto fragment = this->MakeFragment(*source);
+
+ int64_t row_count = 0;
+
+ for (auto maybe_batch : PhysicalBatches(fragment)) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ row_count += batch->num_rows();
+ AssertSchemaEqual(*batch->schema(), *expected_schema,
+ /*check_metadata=*/false);
+ }
+
+ ASSERT_EQ(row_count, expected_rows());
+ }
+ void TestScanWithDuplicateColumnError() {
+ // A duplicate column leads to an error if requested.
+ auto i32 = field("i32", int32());
+ auto i64 = field("i64", int64());
+ this->opts_->dataset_schema = schema({i32, i32, i64});
+ ASSERT_RAISES(Invalid,
+ ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema));
+ }
protected:
using FileFormatFixtureMixin::opts_;
diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc
index 8f05cc6817d..8e411e5fe05 100644
--- a/cpp/src/arrow/testing/generator.cc
+++ b/cpp/src/arrow/testing/generator.cc
@@ -140,7 +140,16 @@ std::shared_ptr ConstantArrayGenerator::Zeroes(
return Float64(size);
case Type::STRING:
return String(size);
+ case Type::STRUCT: {
+ ArrayVector children;
+ children.reserve(type->num_fields());
+ for (const auto& field : type->fields()) {
+ children.push_back(Zeroes(size, field->type()));
+ }
+ return std::make_shared(type, size, children);
+ }
default:
+ ADD_FAILURE() << "ConstantArrayGenerator::Zeroes is not implemented for " << *type;
return nullptr;
}
}
diff --git a/cpp/src/arrow/testing/matchers.h b/cpp/src/arrow/testing/matchers.h
index 381b3b0cb9d..1ffc2e1d9fa 100644
--- a/cpp/src/arrow/testing/matchers.h
+++ b/cpp/src/arrow/testing/matchers.h
@@ -28,6 +28,10 @@
namespace arrow {
+// A matcher that checks that the values pointed to are Equals().
+// Useful in conjunction with other googletest matchers.
+MATCHER(PointeesEquals, "") { return std::get<0>(arg)->Equals(*std::get<1>(arg)); }
+
template
class FutureMatcher {
public:
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 463636b0537..4c439841ba2 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1670,6 +1670,11 @@ class ARROW_EXPORT FieldRef {
const std::string* name() const {
return IsName() ? &util::get(impl_) : NULLPTR;
}
+ const std::vector* nested_refs() const {
+ return util::holds_alternative>(impl_)
+ ? &util::get>(impl_)
+ : NULLPTR;
+ }
/// \brief Retrieve FieldPath of every child field which matches this FieldRef.
std::vector FindAll(const Schema& schema) const;
From 677c6e0abee4355fd72ee37351c74e34ce9c3ecc Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 14 Jan 2022 16:54:45 -0500
Subject: [PATCH 2/2] ARROW-14658: [C++] Remove unnecessary test
---
cpp/src/arrow/dataset/file_csv_test.cc | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc
index 1ab88e48b35..e9088859005 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -393,9 +393,8 @@ TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
-TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
- // The CSV reader rejects duplicate columns
-}
+// The CSV reader rejects duplicate columns, so skip
+// ScanRecordBatchReaderWithDuplicateColumn
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}