diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 745c6b256eb..258238dbb81 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -793,6 +793,73 @@ TEST(ExecPlanExecution, SourceGroupedSum) { } } +TEST(ExecPlanExecution, NestedSourceFilter) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + auto input = MakeNestedBatches(); + auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])"); + auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([ + [{"i32": 5, "bool": null}], + [{"i32": 6, "bool": false}], + [{"i32": 7, "bool": false}] +])"); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + ASSERT_OK(Declaration::Sequence( + { + {"source", SourceNodeOptions{input.schema, + input.gen(parallel, /*slow=*/false)}}, + {"filter", FilterNodeOptions{greater_equal( + field_ref(FieldRef("struct", "i32")), literal(5))}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({empty, expected})))); + } +} + +TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + auto input = MakeNestedBatches(); + auto expected = ExecBatchFromJSON({int64(), boolean()}, R"([ + [null, true], + [17, false], + [5, null] +])"); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + ASSERT_OK( + Declaration::Sequence( + { + {"source", + SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"project", ProjectNodeOptions{{ + field_ref(FieldRef("struct", "i32")), + field_ref(FieldRef("struct", "bool")), + }, + {"i32", "bool"}}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, + /*targets=*/{"i32"}, + /*names=*/{"sum(i32)"}, + /*keys=*/{"bool"}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({expected})))); + } +} + TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 64f3ec997c9..7733ea46084 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -190,6 +190,20 @@ BatchesWithSchema MakeBasicBatches() { return out; } +BatchesWithSchema MakeNestedBatches() { + auto ty = struct_({field("i32", int32()), field("bool", boolean())}); + BatchesWithSchema out; + out.batches = { + ExecBatchFromJSON( + {ty}, + R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}], [null]])"), + ExecBatchFromJSON( + {ty}, + R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}]])")}; + out.schema = schema({field("struct", ty)}); + return out; +} + BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, int num_batches, int batch_size) { BatchesWithSchema out; diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index a05d3b664ee..4a46ca95839 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -87,6 +87,9 @@ Future> StartAndCollect( ARROW_TESTING_EXPORT BatchesWithSchema MakeBasicBatches(); +ARROW_TESTING_EXPORT +BatchesWithSchema MakeNestedBatches(); + ARROW_TESTING_EXPORT BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, int num_batches = 10, int batch_size = 4); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 554046ce603..1cc7957083f 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -116,9 +116,28 @@ static inline Result GetConvertOptions( if (!scan_options) return convert_options; - auto materialized = scan_options->MaterializedFields(); - std::unordered_set materialized_fields(materialized.begin(), - materialized.end()); + auto field_refs = scan_options->MaterializedFields(); + std::unordered_set materialized_fields; + materialized_fields.reserve(field_refs.size()); + // Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's + // quadratic (and this is significant overhead with 1000+ columns) + for (const auto& ref : field_refs) { + if (const std::string* name = ref.name()) { + // Common case + materialized_fields.emplace(*name); + continue; + } + // Currently CSV reader doesn't support reading any nested types, so this + // path shouldn't be hit. However, implement it in the same way as IPC/ORC: + // load the entire top-level field if a nested field is selected. + ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOneOrNone(*scan_options->dataset_schema)); + if (column_names.find(field->name()) == column_names.end()) continue; + // Only read the requested columns + convert_options.include_columns.push_back(field->name()); + // Properly set conversion types + convert_options.column_types[field->name()] = field->type(); + } + for (auto field : scan_options->dataset_schema->fields()) { if (materialized_fields.find(field->name()) == materialized_fields.end()) continue; // Ignore virtual columns. diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 04224236957..e9088859005 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -384,13 +384,20 @@ class TestCsvFileFormatScan : public FileFormatScanMixin {}; TEST_P(TestCsvFileFormatScan, ScanRecordBatchReader) { TestScan(); } TEST_P(TestCsvFileFormatScan, ScanBatchSize) { TestScanBatchSize(); } -TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { - TestScanWithVirtualColumn(); -} TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); } +// NOTE(ARROW-14658): TestScanProjectedNested is ignored since CSV +// doesn't have any nested types for us to work with TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) { TestScanProjectedMissingCols(); } +TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { + TestScanWithVirtualColumn(); +} +// The CSV reader rejects duplicate columns, so skip +// ScanRecordBatchReaderWithDuplicateColumn +TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) { + TestScanWithDuplicateColumnError(); +} INSTANTIATE_TEST_SUITE_P(TestScan, TestCsvFileFormatScan, ::testing::ValuesIn(TestFormatParams::Values()), diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e8e4a90cf06..e386c7dea8d 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -75,10 +75,10 @@ static inline Future> OpenReaderAsyn } static inline Result> GetIncludedFields( - const Schema& schema, const std::vector& materialized_fields) { + const Schema& schema, const std::vector& materialized_fields) { std::vector included_fields; - for (FieldRef ref : materialized_fields) { + for (const auto& ref : materialized_fields) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema)); if (match.indices().empty()) continue; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 6a064f539d9..7422ee9f527 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -135,13 +135,22 @@ class TestIpcFileFormatScan : public FileFormatScanMixin {}; TEST_P(TestIpcFileFormatScan, ScanRecordBatchReader) { TestScan(); } TEST_P(TestIpcFileFormatScan, ScanBatchSize) { TestScanBatchSize(); } -TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { - TestScanWithVirtualColumn(); -} TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); } +TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedNested) { + TestScanProjectedNested(); +} TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) { TestScanProjectedMissingCols(); } +TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { + TestScanWithVirtualColumn(); +} +TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) { + TestScanWithDuplicateColumn(); +} +TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) { + TestScanWithDuplicateColumnError(); +} TEST_P(TestIpcFileFormatScan, FragmentScanOptions) { auto reader = GetRecordBatchReader( // ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index d316e1e0fd5..632e41eff1f 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -79,12 +79,11 @@ class OrcScanTask { // filter out virtual columns std::vector included_fields; ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema()); - for (auto name : materialized_fields) { - FieldRef ref(name); + for (const auto& ref : materialized_fields) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema)); if (match.indices().empty()) continue; - included_fields.push_back(name); + included_fields.push_back(schema->field(match.indices()[0])->name()); } return RecordBatchIterator( diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 546879bff42..4d871602f79 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -69,13 +69,25 @@ TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); } class TestOrcFileFormatScan : public FileFormatScanMixin {}; TEST_P(TestOrcFileFormatScan, ScanRecordBatchReader) { TestScan(); } -TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { - TestScanWithVirtualColumn(); +TEST_P(TestOrcFileFormatScan, ScanBatchSize) { + // TODO(ARROW-14153): TestScanBatchSize(); } TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); } +TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedNested) { + TestScanProjectedNested(); +} TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) { TestScanProjectedMissingCols(); } +TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) { + TestScanWithVirtualColumn(); +} +TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) { + TestScanWithDuplicateColumn(); +} +TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) { + TestScanWithDuplicateColumnError(); +} INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan, ::testing::ValuesIn(TestFormatParams::Values()), TestFormatParams::ToTestNameString); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index cb6a7484fdb..cdee1f00684 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -155,30 +155,112 @@ void AddColumnIndices(const SchemaField& schema_field, } } -// Compute the column projection out of an optional arrow::Schema -std::vector InferColumnProjection(const parquet::arrow::FileReader& reader, - const ScanOptions& options) { +Status ResolveOneFieldRef( + const SchemaManifest& manifest, const FieldRef& field_ref, + const std::unordered_map& field_lookup, + const std::unordered_set& duplicate_fields, + std::vector* columns_selection) { + if (const std::string* name = field_ref.name()) { + auto it = field_lookup.find(*name); + if (it != field_lookup.end()) { + AddColumnIndices(*it->second, columns_selection); + } else if (duplicate_fields.find(*name) != duplicate_fields.end()) { + // We shouldn't generally get here because SetProjection will reject such references + return Status::Invalid("Ambiguous reference to column '", *name, + "' which occurs more than once"); + } + // "Virtual" column: field is not in file but is in the ScanOptions. + // Ignore it here, as projection will pad the batch with a null column. + return Status::OK(); + } + + const SchemaField* toplevel = nullptr; + const SchemaField* field = nullptr; + if (const std::vector* refs = field_ref.nested_refs()) { + // Only supports a sequence of names + for (const auto& ref : *refs) { + if (const std::string* name = ref.name()) { + if (!field) { + // First lookup, top-level field + auto it = field_lookup.find(*name); + if (it != field_lookup.end()) { + field = it->second; + toplevel = field; + } else if (duplicate_fields.find(*name) != duplicate_fields.end()) { + return Status::Invalid("Ambiguous reference to column '", *name, + "' which occurs more than once"); + } else { + // Virtual column + return Status::OK(); + } + } else { + const SchemaField* result = nullptr; + for (const auto& child : field->children) { + if (child.field->name() == *name) { + if (!result) { + result = &child; + } else { + return Status::Invalid("Ambiguous nested reference to column '", *name, + "' which occurs more than once in field ", + field->field->ToString()); + } + } + } + if (!result) { + // Virtual column + return Status::OK(); + } + field = result; + } + continue; + } + return Status::NotImplemented("Inferring column projection from FieldRef ", + field_ref.ToString()); + } + } else { + return Status::NotImplemented("Inferring column projection from FieldRef ", + field_ref.ToString()); + } + + if (field) { + // TODO(ARROW-1888): support fine-grained column projection. We should be + // able to materialize only the child fields requested, and not the entire + // top-level field. + // Right now, if enabled, projection/filtering will fail when they cast the + // physical schema to the dataset schema. + AddColumnIndices(*toplevel, columns_selection); + } + return Status::OK(); +} + +// Compute the column projection based on the scan options +Result> InferColumnProjection(const parquet::arrow::FileReader& reader, + const ScanOptions& options) { auto manifest = reader.manifest(); // Checks if the field is needed in either the projection or the filter. - auto field_names = options.MaterializedFields(); - std::unordered_set materialized_fields{field_names.cbegin(), - field_names.cend()}; - auto should_materialize_column = [&materialized_fields](const std::string& f) { - return materialized_fields.find(f) != materialized_fields.end(); - }; - - std::vector columns_selection; - // Note that the loop is using the file's schema to iterate instead of the - // materialized fields of the ScanOptions. This ensures that missing - // fields in the file (but present in the ScanOptions) will be ignored. The - // scanner's projector will take care of padding the column with the proper - // values. + auto field_refs = options.MaterializedFields(); + + // Build a lookup table from top level field name to field metadata. + // This is to avoid quadratic-time mapping of projected fields to + // column indices, in the common case of selecting top level + // columns. For nested fields, we will pay the cost of a linear scan + // assuming for now that this is relatively rare, but this can be + // optimized. (Also, we don't want to pay the cost of building all + // the lookup tables up front if they're rarely used.) + std::unordered_map field_lookup; + std::unordered_set duplicate_fields; for (const auto& schema_field : manifest.schema_fields) { - if (should_materialize_column(schema_field.field->name())) { - AddColumnIndices(schema_field, &columns_selection); + const auto it = field_lookup.emplace(schema_field.field->name(), &schema_field); + if (!it.second) { + duplicate_fields.emplace(schema_field.field->name()); } } + std::vector columns_selection; + for (const auto& ref : field_refs) { + RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields, + &columns_selection)); + } return columns_selection; } @@ -351,7 +433,8 @@ Result ParquetFileFormat::ScanBatchesAsync( parquet_fragment->FilterRowGroups(options->filter)); if (row_groups.empty()) return MakeEmptyGenerator>(); } - auto column_projection = InferColumnProjection(*reader, *options); + ARROW_ASSIGN_OR_RAISE(auto column_projection, + InferColumnProjection(*reader, *options)); ARROW_ASSIGN_OR_RAISE( auto parquet_scan_options, GetFragmentScanOptions( diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 090737ad062..d5c7a0b9850 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -377,9 +377,22 @@ class TestParquetFileFormatScan : public FileFormatScanMixin>()>; -std::vector ScanOptions::MaterializedFields() const { - std::vector fields; +std::vector ScanOptions::MaterializedFields() const { + std::vector fields; for (const compute::Expression* expr : {&filter, &projection}) { - for (const FieldRef& ref : FieldsInExpression(*expr)) { - DCHECK(ref.name()); - fields.push_back(*ref.name()); - } + auto refs = FieldsInExpression(*expr); + fields.insert(fields.end(), std::make_move_iterator(refs.begin()), + std::make_move_iterator(refs.end())); } return fields; @@ -591,13 +590,6 @@ Result> AsyncScanner::ToRecordBatchReader() { } const std::shared_ptr& AsyncScanner::dataset() const { return dataset_; } - -Status NestedFieldRefsNotImplemented() { - // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that - // only top level fields will be materialized. - return Status::NotImplemented("Nested field references in scans."); -} - } // namespace Result ProjectionDescr::FromStructExpression( @@ -623,8 +615,6 @@ Result ProjectionDescr::FromExpressions( for (size_t i = 0; i < exprs.size(); ++i) { if (auto ref = exprs[i].field_ref()) { - if (!ref->name()) return NestedFieldRefsNotImplemented(); - // set metadata and nullability for plain field references ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(dataset_schema)); project_options.field_nullability[i] = field->nullable(); @@ -708,8 +698,6 @@ Status ScannerBuilder::Project(std::vector exprs, Status ScannerBuilder::Filter(const compute::Expression& filter) { for (const auto& ref : FieldsInExpression(filter)) { - if (!ref.name()) return NestedFieldRefsNotImplemented(); - RETURN_NOT_OK(ref.FindOne(*scan_options_->dataset_schema)); } ARROW_ASSIGN_OR_RAISE(scan_options_->filter, diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 738915a8129..013ba092b0c 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -118,21 +118,21 @@ struct ARROW_DS_EXPORT ScanOptions { /// Fragment-specific scan options. std::shared_ptr fragment_scan_options; - // Return a vector of fields that requires materialization. - // - // This is usually the union of the fields referenced in the projection and the - // filter expression. Examples: - // - // - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"] - // - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"] - // - // This is needed for expression where a field may not be directly - // used in the final projection but is still required to evaluate the - // expression. - // - // This is used by Fragment implementations to apply the column - // sub-selection optimization. - std::vector MaterializedFields() const; + /// Return a vector of FieldRefs that require materialization. + /// + /// This is usually the union of the fields referenced in the projection and the + /// filter expression. Examples: + /// + /// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"] + /// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"] + /// + /// This is needed for expression where a field may not be directly + /// used in the final projection but is still required to evaluate the + /// expression. + /// + /// This is used by Fragment implementations to apply the column + /// sub-selection optimization. + std::vector MaterializedFields() const; }; /// \brief Describes a projection diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index c44849c3402..e62c626d9f8 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -197,6 +197,46 @@ TEST_P(TestScanner, FilteredScan) { AssertScanBatchesEqualRepetitionsOf(MakeScanner(batch), filtered_batch); } +TEST_P(TestScanner, FilteredScanNested) { + auto struct_ty = struct_({field("f64", float64())}); + SetSchema({field("struct", struct_ty)}); + + double value = 0.5; + ASSERT_OK_AND_ASSIGN(auto f64, + ArrayFromBuilderVisitor(float64(), GetParam().items_per_batch, + GetParam().items_per_batch / 2, + [&](DoubleBuilder* builder) { + builder->UnsafeAppend(value); + builder->UnsafeAppend(-value); + value += 1.0; + })); + + SetFilter(greater(field_ref(FieldRef("struct", "f64")), literal(0.0))); + + auto batch = RecordBatch::Make( + schema_, f64->length(), + { + std::make_shared(struct_ty, f64->length(), ArrayVector{f64}), + }); + + value = 0.5; + ASSERT_OK_AND_ASSIGN(auto f64_filtered, + ArrayFromBuilderVisitor(float64(), GetParam().items_per_batch / 2, + [&](DoubleBuilder* builder) { + builder->UnsafeAppend(value); + value += 1.0; + })); + + auto filtered_batch = RecordBatch::Make( + schema_, f64_filtered->length(), + { + std::make_shared(struct_ty, f64_filtered->length(), + ArrayVector{f64_filtered}), + }); + + AssertScanBatchesEqualRepetitionsOf(MakeScanner(batch), filtered_batch); +} + TEST_P(TestScanner, ProjectedScan) { SetSchema({field("i32", int32()), field("f64", float64())}); SetProjectedColumns({"i32"}); @@ -206,6 +246,25 @@ TEST_P(TestScanner, ProjectedScan) { AssertScanBatchesUnorderedEqualRepetitionsOf(MakeScanner(batch_in), batch_out); } +TEST_P(TestScanner, ProjectedScanNested) { + SetSchema({ + field("struct", struct_({field("i32", int32()), field("f64", float64())})), + field("nested", struct_({field("left", int32()), + field("right", struct_({field("i32", int32()), + field("f64", float64())}))})), + }); + ASSERT_OK_AND_ASSIGN(auto descr, ProjectionDescr::FromExpressions( + {field_ref(FieldRef("struct", "i32")), + field_ref(FieldRef("nested", "right", "f64"))}, + {"i32", "f64"}, *options_->dataset_schema)) + SetProjection(options_.get(), std::move(descr)); + auto batch_in = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); + auto batch_out = ConstantArrayGenerator::Zeroes( + GetParam().items_per_batch, + schema({field("i32", int32()), field("f64", float64())})); + AssertScanBatchesUnorderedEqualRepetitionsOf(MakeScanner(batch_in), batch_out); +} + TEST_P(TestScanner, MaterializeMissingColumn) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch_missing_f64 = ConstantArrayGenerator::Zeroes( @@ -1062,6 +1121,7 @@ class TestScannerBuilder : public ::testing::Test { field("i16", int16()), field("i32", int32()), field("i64", int64()), + field("nested", struct_({field("str", utf8())})), }); ASSERT_OK_AND_ASSIGN(dataset_, UnionDataset::Make(schema_, sources)); @@ -1084,6 +1144,7 @@ TEST_F(TestScannerBuilder, TestProject) { ASSERT_OK(builder.Project( {field_ref("i16"), call("multiply", {field_ref("i16"), literal(2)})}, {"i16 renamed", "i16 * 2"})); + ASSERT_OK(builder.Project({field_ref(FieldRef("nested", "str"))}, {".nested.str"})); ASSERT_RAISES(Invalid, builder.Project({"not_found_column"})); ASSERT_RAISES(Invalid, builder.Project({"i8", "not_found_column"})); @@ -1092,8 +1153,8 @@ TEST_F(TestScannerBuilder, TestProject) { call("multiply", {field_ref("i16"), literal(2)})}, {"i16 renamed", "i16 * 2"})); - ASSERT_RAISES(NotImplemented, builder.Project({field_ref(FieldRef("nested", "column"))}, - {"nested column"})); + ASSERT_RAISES(Invalid, builder.Project({field_ref(FieldRef("nested", "not_a_column"))}, + {"nested column"})); // provided more field names than column exprs or vice versa ASSERT_RAISES(Invalid, builder.Project({}, {"i16 renamed", "i16 * 2"})); @@ -1107,14 +1168,15 @@ TEST_F(TestScannerBuilder, TestFilter) { ASSERT_OK(builder.Filter(equal(field_ref("i64"), literal(10)))); ASSERT_OK(builder.Filter(or_(equal(field_ref("i64"), literal(10)), equal(field_ref("b"), literal(true))))); + ASSERT_OK(builder.Filter(equal(field_ref(FieldRef("nested", "str")), literal("")))); ASSERT_OK(builder.Filter(equal(field_ref("i64"), literal(10)))); ASSERT_RAISES(Invalid, builder.Filter(equal(field_ref("not_a_column"), literal(true)))); - ASSERT_RAISES( - NotImplemented, - builder.Filter(equal(field_ref(FieldRef("nested", "column")), literal(true)))); + ASSERT_RAISES(Invalid, + builder.Filter( + equal(field_ref(FieldRef("nested", "not_a_column")), literal(true)))); ASSERT_RAISES(Invalid, builder.Filter(or_(equal(field_ref("i64"), literal(10)), @@ -1143,7 +1205,7 @@ TEST(ScanOptions, TestMaterializedFields) { // project nothing, filter on i32 = materialize i32 opts->filter = equal(field_ref("i32"), literal(10)); - EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32")); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i32"))); // project i32 & i64, filter nothing = materialize i32 & i64 opts->filter = literal(true); @@ -1165,15 +1227,50 @@ TEST(ScanOptions, TestMaterializedFields) { // project i32, filter on i32 = materialize i32 (reported twice) opts->filter = equal(field_ref("i32"), literal(10)); - EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i32")); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i32"), FieldRef("i32"))); // project i32, filter on i32 & i64 = materialize i64, i32 (reported twice) opts->filter = less(field_ref("i32"), field_ref("i64")); - EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64", "i32")); + EXPECT_THAT(opts->MaterializedFields(), + ElementsAre(FieldRef("i32"), FieldRef("i64"), FieldRef("i32"))); // project i32, filter on i64 = materialize i32 & i64 opts->filter = equal(field_ref("i64"), literal(10)); - EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i64", "i32")); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("i64"), FieldRef("i32"))); + + auto nested = field("nested", struct_({i32, i64})); + opts->dataset_schema = schema({nested}); + + // project top-level field, filter nothing + opts->filter = literal(true); + ASSERT_OK_AND_ASSIGN(projection, + ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested"))); + + // project child field, filter nothing + opts->filter = literal(true); + ASSERT_OK_AND_ASSIGN(projection, ProjectionDescr::FromExpressions( + {field_ref(FieldRef("nested", "i64"))}, + {"nested.i64"}, *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested", "i64"))); + + // project nothing, filter child field + opts->filter = equal(field_ref(FieldRef("nested", "i64")), literal(10)); + ASSERT_OK_AND_ASSIGN(projection, + ProjectionDescr::FromExpressions({}, {}, *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested", "i64"))); + + // project child field, filter child field + opts->filter = equal(field_ref(FieldRef("nested", "i64")), literal(10)); + ASSERT_OK_AND_ASSIGN(projection, ProjectionDescr::FromExpressions( + {field_ref(FieldRef("nested", "i32"))}, + {"nested.i32"}, *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); + EXPECT_THAT(opts->MaterializedFields(), + ElementsAre(FieldRef("nested", "i64"), FieldRef("nested", "i32"))); } namespace { @@ -1209,66 +1306,137 @@ struct DatasetAndBatches { std::vector batches; }; -DatasetAndBatches MakeBasicDataset() { - const auto dataset_schema = ::arrow::schema({ - field("a", int32()), - field("b", boolean()), - field("c", int32()), - }); - - const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); - - RecordBatchVector record_batches{ - RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null}, - {"a": 2, "b": true}])"), - RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true}, - {"a": 3, "b": false}])"), - RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true}, - {"a": 4, "b": false}])"), - RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null}, - {"a": 6, "b": false}, - {"a": 7, "b": false}])"), - }; - - auto dataset = std::make_shared( - dataset_schema, - FragmentVector{ - std::make_shared( - physical_schema, RecordBatchVector{record_batches[0], record_batches[1]}, - equal(field_ref("c"), literal(23))), - std::make_shared( - physical_schema, RecordBatchVector{record_batches[2], record_batches[3]}, - equal(field_ref("c"), literal(47))), - }); +DatasetAndBatches DatasetAndBatchesFromJSON( + const std::shared_ptr& dataset_schema, + const std::shared_ptr& physical_schema, + const std::vector>& fragment_batch_strs, + const std::vector& guarantees, + std::function make_exec_batch = {}) { + if (!guarantees.empty()) { + EXPECT_EQ(fragment_batch_strs.size(), guarantees.size()); + } + RecordBatchVector record_batches; + FragmentVector fragments; + fragments.reserve(fragment_batch_strs.size()); + for (size_t i = 0; i < fragment_batch_strs.size(); i++) { + const auto& batch_strs = fragment_batch_strs[i]; + RecordBatchVector fragment_batches; + fragment_batches.reserve(batch_strs.size()); + for (const auto& batch_str : batch_strs) { + fragment_batches.push_back(RecordBatchFromJSON(physical_schema, batch_str)); + } + record_batches.insert(record_batches.end(), fragment_batches.begin(), + fragment_batches.end()); + fragments.push_back(std::make_shared( + physical_schema, std::move(fragment_batches), + guarantees.empty() ? literal(true) : guarantees[i])); + } std::vector batches; - auto batch_it = record_batches.begin(); - for (int fragment_index = 0; fragment_index < 2; ++fragment_index) { - for (int batch_index = 0; batch_index < 2; ++batch_index) { + for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size(); + ++fragment_index) { + for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size(); + ++batch_index) { const auto& batch = *batch_it++; // the scanned ExecBatches will begin with physical columns batches.emplace_back(*batch); - // a placeholder will be inserted for partition field "c" - batches.back().values.emplace_back(std::make_shared()); + // allow customizing the ExecBatch (e.g. to fill in placeholders for partition + // fields) + if (make_exec_batch) { + make_exec_batch(&batches.back(), *batch); + } // scanned batches will be augmented with fragment and batch indices - batches.back().values.emplace_back(fragment_index); - batches.back().values.emplace_back(batch_index); + batches.back().values.emplace_back(static_cast(fragment_index)); + batches.back().values.emplace_back(static_cast(batch_index)); // ... and with the last-in-fragment flag - batches.back().values.emplace_back(batch_index == 1); + batches.back().values.emplace_back(batch_index == + fragment_batch_strs[fragment_index].size() - 1); - // each batch carries a guarantee inherited from its Fragment's partition - // expression - batches.back().guarantee = - equal(field_ref("c"), literal(fragment_index == 0 ? 23 : 47)); + // each batch carries a guarantee inherited from its Fragment's partition expression + batches.back().guarantee = fragments[fragment_index]->partition_expression(); } } - return {dataset, batches}; + auto dataset = std::make_shared(dataset_schema, std::move(fragments)); + return {std::move(dataset), std::move(batches)}; +} + +DatasetAndBatches MakeBasicDataset() { + const auto dataset_schema = ::arrow::schema({ + field("a", int32()), + field("b", boolean()), + field("c", int32()), + }); + + const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"}); + + return DatasetAndBatchesFromJSON( + dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + }, + { + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + }, + }, + { + equal(field_ref("c"), literal(23)), + equal(field_ref("c"), literal(47)), + }, + [](compute::ExecBatch* batch, const RecordBatch&) { + // a placeholder will be inserted for partition field "c" + batch->values.emplace_back(std::make_shared()); + }); +} + +DatasetAndBatches MakeNestedDataset() { + const auto dataset_schema = ::arrow::schema({ + field("a", int32()), + field("b", boolean()), + field("c", struct_({ + field("d", int64()), + field("e", float64()), + })), + }); + const auto physical_schema = ::arrow::schema({ + field("a", int32()), + field("b", boolean()), + field("c", struct_({ + field("e", int64()), + })), + }); + + return DatasetAndBatchesFromJSON(dataset_schema, physical_schema, + { + { + R"([{"a": 1, "b": null, "c": {"e": 0}}, + {"a": 2, "b": true, "c": {"e": 1}}])", + R"([{"a": null, "b": true, "c": {"e": 2}}, + {"a": 3, "b": false, "c": {"e": null}}])", + R"([{"a": null, "b": null, "c": null}])", + }, + { + R"([{"a": null, "b": true, "c": {"e": 4}}, + {"a": 4, "b": false, "c": null}])", + R"([{"a": 5, "b": null, "c": {"e": 6}}, + {"a": 6, "b": false, "c": {"e": 7}}, + {"a": 7, "b": false, "c": {"e": null}}])", + }, + }, + /*guarantees=*/{}); } compute::Expression Materialize(std::vector names, @@ -1438,6 +1606,31 @@ TEST(ScanNode, MaterializationOfVirtualColumn) { ASSERT_THAT(plan.Run(), Finishes(ResultWith(UnorderedElementsAreArray(expected)))); } +TEST(ScanNode, MaterializationOfNestedVirtualColumn) { + TestPlan plan; + + auto basic = MakeNestedDataset(); + + auto options = std::make_shared(); + options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); + + ASSERT_OK(compute::Declaration::Sequence( + { + {"scan", ScanNodeOptions{basic.dataset, options}}, + {"augmented_project", + compute::ProjectNodeOptions{ + {field_ref("a"), field_ref("b"), field_ref("c")}}}, + {"sink", compute::SinkNodeOptions{&plan.sink_gen}}, + }) + .AddToPlan(plan.get())); + + // TODO(ARROW-1888): allow scanner to "patch up" structs with casts + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + NotImplemented, + ::testing::HasSubstr("Unsupported cast from struct to struct"), + plan.Run()); +} + TEST(ScanNode, MinimalEndToEnd) { // NB: This test is here for didactic purposes diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 73171618ac1..5128021bf65 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -45,6 +45,7 @@ #include "arrow/testing/future_util.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/util/async_generator.h" #include "arrow/util/io_util.h" @@ -410,6 +411,18 @@ class FileFormatFixtureMixin : public ::testing::Test { SetProjection(opts_.get(), std::move(projection)); } + void ProjectNested(std::vector names) { + std::vector exprs; + for (const auto& name : names) { + ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name)); + exprs.push_back(field_ref(ref)); + } + ASSERT_OK_AND_ASSIGN( + auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names), + *opts_->dataset_schema)); + SetProjection(opts_.get(), std::move(descr)); + } + // Shared test cases void AssertInspectFailure(const std::string& contents, StatusCode code, const std::string& format_name) { @@ -630,12 +643,110 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, for (auto maybe_batch : PhysicalBatches(fragment)) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); row_count += batch->num_rows(); - AssertSchemaEqual(*batch->schema(), *expected_schema, - /*check_metadata=*/false); + ASSERT_THAT( + batch->schema()->fields(), + ::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields())) + << "EXPECTED:\n" + << expected_schema->ToString() << "\nACTUAL:\n" + << batch->schema()->ToString(); } ASSERT_EQ(row_count, expected_rows()); } + void TestScanProjectedNested(bool fine_grained_selection = false) { + auto f32 = field("f32", float32()); + auto f64 = field("f64", float64()); + auto i32 = field("i32", int32()); + auto i64 = field("i64", int64()); + auto struct1 = field("struct1", struct_({f32, i32})); + auto struct2 = field("struct2", struct_({f64, i64, struct1})); + this->SetSchema({struct1, struct2, f32, f64, i32, i64}); + this->ProjectNested({".struct1.f32", ".struct2.struct1", ".struct2.struct1.f32"}); + this->SetFilter(greater_equal(field_ref(FieldRef("struct2", "i64")), literal(0))); + + std::shared_ptr physical_schema; + if (fine_grained_selection) { + // Some formats, like Parquet, let you pluck only a part of a complex type + physical_schema = schema( + {field("struct1", struct_({f32})), field("struct2", struct_({i64, struct1}))}); + } else { + // Otherwise, the entire top-level field is returned + physical_schema = schema({struct1, struct2}); + } + std::shared_ptr projected_schema = schema({ + field(".struct1.f32", float32()), + field(".struct2.struct1", struct1->type()), + field(".struct2.struct1.f32", float32()), + }); + + { + auto reader = this->GetRecordBatchReader(opts_->dataset_schema); + auto source = this->GetFileSource(reader.get()); + auto fragment = this->MakeFragment(*source); + + int64_t row_count = 0; + for (auto maybe_batch : PhysicalBatches(fragment)) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + row_count += batch->num_rows(); + ASSERT_THAT( + batch->schema()->fields(), + ::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields())) + << "EXPECTED:\n" + << physical_schema->ToString() << "\nACTUAL:\n" + << batch->schema()->ToString(); + } + ASSERT_EQ(row_count, expected_rows()); + } + { + auto reader = this->GetRecordBatchReader(opts_->dataset_schema); + auto source = this->GetFileSource(reader.get()); + auto fragment = this->MakeFragment(*source); + + int64_t row_count = 0; + for (auto maybe_batch : Batches(fragment)) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + row_count += batch->num_rows(); + AssertSchemaEqual(*batch->schema(), *projected_schema, /*check_metadata=*/false); + } + ASSERT_LE(row_count, expected_rows()); + ASSERT_GT(row_count, 0); + } + { + // File includes a duplicated name in struct2 + auto struct2_physical = field("struct2", struct_({f64, i64, struct1, i64})); + auto reader = this->GetRecordBatchReader( + schema({struct1, struct2_physical, f32, f64, i32, i64})); + auto source = this->GetFileSource(reader.get()); + auto fragment = this->MakeFragment(*source); + + auto iterator = PhysicalBatches(fragment); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("i64"), + iterator.Next().status()); + } + { + // File is missing a child in struct1 + auto struct1_physical = field("struct1", struct_({i32})); + auto reader = this->GetRecordBatchReader( + schema({struct1_physical, struct2, f32, f64, i32, i64})); + auto source = this->GetFileSource(reader.get()); + auto fragment = this->MakeFragment(*source); + + physical_schema = schema({physical_schema->field(1)}); + + int64_t row_count = 0; + for (auto maybe_batch : PhysicalBatches(fragment)) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + row_count += batch->num_rows(); + ASSERT_THAT( + batch->schema()->fields(), + ::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields())) + << "EXPECTED:\n" + << physical_schema->ToString() << "\nACTUAL:\n" + << batch->schema()->ToString(); + } + ASSERT_EQ(row_count, expected_rows()); + } + } void TestScanProjectedMissingCols() { auto f32 = field("f32", float32()); auto f64 = field("f64", float64()); @@ -674,8 +785,12 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, for (auto maybe_batch : PhysicalBatches(fragment)) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); row_count += batch->num_rows(); - AssertSchemaEqual(*batch->schema(), *expected_schema, - /*check_metadata=*/false); + ASSERT_THAT( + batch->schema()->fields(), + ::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields())) + << "EXPECTED:\n" + << expected_schema->ToString() << "\nACTUAL:\n" + << batch->schema()->ToString(); } ASSERT_EQ(row_count, expected_rows()); } @@ -708,6 +823,36 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, ASSERT_EQ(row_count, expected_rows()); } } + void TestScanWithDuplicateColumn() { + // A duplicate column is ignored if not requested. + auto i32 = field("i32", int32()); + auto i64 = field("i64", int64()); + this->opts_->dataset_schema = schema({i32, i32, i64}); + this->Project({"i64"}); + auto expected_schema = schema({i64}); + auto reader = this->GetRecordBatchReader(opts_->dataset_schema); + auto source = this->GetFileSource(reader.get()); + auto fragment = this->MakeFragment(*source); + + int64_t row_count = 0; + + for (auto maybe_batch : PhysicalBatches(fragment)) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + row_count += batch->num_rows(); + AssertSchemaEqual(*batch->schema(), *expected_schema, + /*check_metadata=*/false); + } + + ASSERT_EQ(row_count, expected_rows()); + } + void TestScanWithDuplicateColumnError() { + // A duplicate column leads to an error if requested. + auto i32 = field("i32", int32()); + auto i64 = field("i64", int64()); + this->opts_->dataset_schema = schema({i32, i32, i64}); + ASSERT_RAISES(Invalid, + ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema)); + } protected: using FileFormatFixtureMixin::opts_; diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 8f05cc6817d..8e411e5fe05 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -140,7 +140,16 @@ std::shared_ptr ConstantArrayGenerator::Zeroes( return Float64(size); case Type::STRING: return String(size); + case Type::STRUCT: { + ArrayVector children; + children.reserve(type->num_fields()); + for (const auto& field : type->fields()) { + children.push_back(Zeroes(size, field->type())); + } + return std::make_shared(type, size, children); + } default: + ADD_FAILURE() << "ConstantArrayGenerator::Zeroes is not implemented for " << *type; return nullptr; } } diff --git a/cpp/src/arrow/testing/matchers.h b/cpp/src/arrow/testing/matchers.h index 381b3b0cb9d..1ffc2e1d9fa 100644 --- a/cpp/src/arrow/testing/matchers.h +++ b/cpp/src/arrow/testing/matchers.h @@ -28,6 +28,10 @@ namespace arrow { +// A matcher that checks that the values pointed to are Equals(). +// Useful in conjunction with other googletest matchers. +MATCHER(PointeesEquals, "") { return std::get<0>(arg)->Equals(*std::get<1>(arg)); } + template class FutureMatcher { public: diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 463636b0537..4c439841ba2 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1670,6 +1670,11 @@ class ARROW_EXPORT FieldRef { const std::string* name() const { return IsName() ? &util::get(impl_) : NULLPTR; } + const std::vector* nested_refs() const { + return util::holds_alternative>(impl_) + ? &util::get>(impl_) + : NULLPTR; + } /// \brief Retrieve FieldPath of every child field which matches this FieldRef. std::vector FindAll(const Schema& schema) const;