diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index ec974787cae..ec4a28c8a0e 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -145,8 +145,8 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) const { DictionaryScalar::ValueType value; value.dictionary = dictionaries_[field_index]; - if (!value.dictionary->type()->Equals( - checked_cast(*field->type()).value_type())) { + const auto& dictionary_type = checked_cast(*field->type()); + if (!value.dictionary->type()->Equals(dictionary_type.value_type())) { return Status::TypeError("Dictionary supplied for field ", field->ToString(), " had incorrect type ", value.dictionary->type()->ToString()); @@ -155,6 +155,8 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) const { // look up the partition value in the dictionary ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), *key.value)); ARROW_ASSIGN_OR_RAISE(auto index, compute::IndexIn(converted, value.dictionary)); + auto to_index_type = compute::CastOptions::Safe(dictionary_type.index_type()); + ARROW_ASSIGN_OR_RAISE(index, compute::Cast(index, to_index_type)); value.index = index.scalar(); if (!value.index->is_valid) { return Status::Invalid("Dictionary supplied for field ", field->ToString(), @@ -300,10 +302,18 @@ class KeyValuePartitioningFactory : public PartitioningFactory { return repr_memos_[index]->GetOrInsert(repr, &dummy); } - Result> DoInpsect() { + Result> DoInspect() { dictionaries_.assign(name_to_index_.size(), nullptr); std::vector> fields(name_to_index_.size()); + if (options_.schema) { + const auto requested_size = options_.schema->fields().size(); + const auto inferred_size = fields.size(); + if (inferred_size != requested_size) { + return Status::Invalid("Requested schema has ", requested_size, + " fields, but only ", inferred_size, " were detected"); + } + } for (const auto& name_index : name_to_index_) { const auto& name = name_index.first; @@ -317,15 +327,34 @@ class KeyValuePartitioningFactory : public PartitioningFactory { "'; couldn't infer type"); } - // try casting to int32, otherwise bail and just use the string reprs - auto dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array(); - auto type = dict->type(); - if (options_.infer_dictionary) { - // wrap the inferred type in dictionary() - type = dictionary(int32(), std::move(type)); + std::shared_ptr current_field; + std::shared_ptr dict; + if (options_.schema) { + // if we have a schema, use the schema type. + current_field = options_.schema->field(index); + auto cast_target = current_field->type(); + if (is_dictionary(cast_target->id())) { + cast_target = checked_pointer_cast(cast_target)->value_type(); + } + auto maybe_dict = compute::Cast(reprs, cast_target); + if (!maybe_dict.ok()) { + return Status::Invalid("Could not cast segments for partition field ", + current_field->name(), " to requested type ", + current_field->type()->ToString(), + " because: ", maybe_dict.status()); + } + dict = maybe_dict.ValueOrDie().make_array(); + } else { + // try casting to int32, otherwise bail and just use the string reprs + dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array(); + auto type = dict->type(); + if (options_.infer_dictionary) { + // wrap the inferred type in dictionary() + type = dictionary(int32(), std::move(type)); + } + current_field = field(name, std::move(type)); } - - fields[index] = field(name, std::move(type)); + fields[index] = std::move(current_field); dictionaries_[index] = std::move(dict); } @@ -379,7 +408,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { } } - return DoInpsect(); + return DoInspect(); } Result> Finish( @@ -480,7 +509,7 @@ class HivePartitioningFactory : public KeyValuePartitioningFactory { } field_names_ = FieldNames(); - return DoInpsect(); + return DoInspect(); } Result> Finish( diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index eff1f2609e8..c49ac5e923e 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -90,6 +90,10 @@ struct PartitioningFactoryOptions { /// columns, and Expressions parsed by the finished Partitioning will include /// dictionaries of all unique inspected values for each field. bool infer_dictionary = false; + /// Optionally, an expected schema can be provided, in which case inference + /// will only check discovered fields against the schema and update internal + /// state (such as dictionaries). + std::shared_ptr schema; }; struct HivePartitioningFactoryOptions : PartitioningFactoryOptions { diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index cf97507deac..456b2852311 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -225,6 +225,19 @@ TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionary) { AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello"); } +TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionaryCustomIndex) { + // Make sure a non-int32 index type is properly cast to, else we fail a CHECK when + // we construct a dictionary array with the wrong index type + auto dict_type = dictionary(int8(), utf8()); + auto dictionary = ArrayFromJSON(utf8(), R"(["hello", "world"])"); + partitioning_ = std::make_shared( + schema({field("alpha", dict_type)}), ArrayVector{dictionary}); + written_schema_ = partitioning_->schema(); + + ASSERT_OK_AND_ASSIGN(auto dict_hello, MakeScalar("hello")->CastTo(dict_type)); + AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello"); +} + TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) { for (auto temporal : {timestamp(TimeUnit::SECOND), date32()}) { partitioning_ = std::make_shared( @@ -464,6 +477,67 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) { AssertParseError("/alpha=yosemite"); // not in inspected dictionary } +TEST_F(TestPartitioning, ExistingSchemaDirectory) { + // Infer dictionary values but with a given schema + auto dict_type = dictionary(int8(), utf8()); + PartitioningFactoryOptions options; + options.schema = schema({field("alpha", int64()), field("beta", dict_type)}); + factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options); + + AssertInspect({"/0/1"}, options.schema->fields()); + AssertInspect({"/0/1/what"}, options.schema->fields()); + + // fail if any segment is not parseable as schema type + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("Failed to parse string"), + factory_->Inspect({"/0/1", "/hello/1"})); + factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options); + + // Now we don't fail since our type is large enough + AssertInspect({"/3760212050/1"}, options.schema->fields()); + // If there are still too many digits, fail + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("Failed to parse string"), + factory_->Inspect({"/1038581385102940193760212050/1"})); + factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options); + + AssertInspect({"/0/1", "/2"}, options.schema->fields()); +} + +TEST_F(TestPartitioning, ExistingSchemaHive) { + // Infer dictionary values but with a given schema + auto dict_type = dictionary(int8(), utf8()); + HivePartitioningFactoryOptions options; + options.schema = schema({field("a", int64()), field("b", dict_type)}); + factory_ = HivePartitioning::MakeFactory(options); + + AssertInspect({"/a=0/b=1"}, options.schema->fields()); + AssertInspect({"/a=0/b=1/what"}, options.schema->fields()); + AssertInspect({"/a=0", "/b=1"}, options.schema->fields()); + + // fail if any segment for field alpha is not parseable as schema type + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr( + "Could not cast segments for partition field a to requested type int64"), + factory_->Inspect({"/a=0/b=1", "/a=hello/b=1"})); + factory_ = HivePartitioning::MakeFactory(options); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + ::testing::HasSubstr("Requested schema has 2 fields, but only 1 were detected"), + factory_->Inspect({"/a=0", "/a=hello"})); + factory_ = HivePartitioning::MakeFactory(options); + + // Now we don't fail since our type is large enough + AssertInspect({"/a=3760212050/b=1"}, options.schema->fields()); + // If there are still too many digits, fail + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("Failed to parse string"), + factory_->Inspect({"/a=1038581385102940193760212050/b=1"})); + factory_ = HivePartitioning::MakeFactory(options); + + AssertInspect({"/a=0/b=1", "/b=2"}, options.schema->fields()); +} + TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1c4e5d302c5..f1e5168c7be 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1528,8 +1528,9 @@ cdef class DirectoryPartitioning(Partitioning): self.directory_partitioning = sp.get() @staticmethod - def discover(field_names, infer_dictionary=False, - max_partition_dictionary_size=0): + def discover(field_names=None, infer_dictionary=False, + max_partition_dictionary_size=0, + schema=None): """ Discover a DirectoryPartitioning. @@ -1537,6 +1538,7 @@ cdef class DirectoryPartitioning(Partitioning): ---------- field_names : list of str The names to associate with the values from the subdirectory names. + If schema is given, will be populated from the schema. infer_dictionary : bool, default False When inferring a schema for partition fields, yield dictionary encoded types instead of plain types. This can be more efficient @@ -1547,6 +1549,10 @@ cdef class DirectoryPartitioning(Partitioning): Synonymous with infer_dictionary for backwards compatibility with 1.0: setting this to -1 or None is equivalent to passing infer_dictionary=True. + schema : Schema, default None + Use this schema instead of inferring a schema from partition + values. Partition values will be validated against this schema + before accumulation into the Partitioning's dictionary. Returns ------- @@ -1566,7 +1572,15 @@ cdef class DirectoryPartitioning(Partitioning): if infer_dictionary: c_options.infer_dictionary = True - c_field_names = [tobytes(s) for s in field_names] + if schema: + c_options.schema = pyarrow_unwrap_schema(schema) + c_field_names = [tobytes(f.name) for f in schema] + elif not field_names: + raise ValueError( + "Neither field_names nor schema was passed; " + "cannot infer field_names") + else: + c_field_names = [tobytes(s) for s in field_names] return PartitioningFactory.wrap( CDirectoryPartitioning.MakeFactory(c_field_names, c_options)) @@ -1637,7 +1651,8 @@ cdef class HivePartitioning(Partitioning): @staticmethod def discover(infer_dictionary=False, max_partition_dictionary_size=0, - null_fallback="__HIVE_DEFAULT_PARTITION__"): + null_fallback="__HIVE_DEFAULT_PARTITION__", + schema=None): """ Discover a HivePartitioning. @@ -1657,6 +1672,10 @@ cdef class HivePartitioning(Partitioning): When inferring a schema for partition fields this value will be replaced by null. The default is set to __HIVE_DEFAULT_PARTITION__ for compatibility with Spark + schema : Schema, default None + Use this schema instead of inferring a schema from partition + values. Partition values will be validated against this schema + before accumulation into the Partitioning's dictionary. Returns ------- @@ -1677,6 +1696,9 @@ cdef class HivePartitioning(Partitioning): c_options.null_fallback = tobytes(null_fallback) + if schema: + c_options.schema = pyarrow_unwrap_schema(schema) + return PartitioningFactory.wrap( CHivePartitioning.MakeFactory(c_options)) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 2042a522918..a2cb87a1f7a 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -122,11 +122,13 @@ def partitioning(schema=None, field_names=None, flavor=None, flavor : str, default None The default is DirectoryPartitioning. Specify ``flavor="hive"`` for a HivePartitioning. - dictionaries : List[Array] + dictionaries : Dict[str, Array] If the type of any field of `schema` is a dictionary type, the corresponding entry of `dictionaries` must be an array containing every value which may be taken by the corresponding column or an - error will be raised in parsing. + error will be raised in parsing. Alternatively, pass `infer` to have + Arrow discover the dictionary values, in which case a + PartitioningFactory is returned. Returns ------- @@ -146,6 +148,27 @@ def partitioning(schema=None, field_names=None, flavor=None, For paths like "/2009/June", the year will be inferred as int32 while month will be inferred as string. + Specify a Schema with dictionary encoding, providing dictionary values: + + >>> partitioning( + ... pa.schema([ + ... ("year", pa.int16()), + ... ("month", pa.dictionary(pa.int8(), pa.string())) + ... ]), + ... dictionaries={ + ... "month": pa.array(["January", "February", "March"]), + ... }) + + Alternatively, specify a Schema with dictionary encoding, but have Arrow + infer the dictionary values: + + >>> partitioning( + ... pa.schema([ + ... ("year", pa.int16()), + ... ("month", pa.dictionary(pa.int8(), pa.string())) + ... ]), + ... dictionaries="infer") + Create a Hive scheme for a path like "/year=2009/month=11": >>> partitioning( @@ -164,6 +187,8 @@ def partitioning(schema=None, field_names=None, flavor=None, if field_names is not None: raise ValueError( "Cannot specify both 'schema' and 'field_names'") + if dictionaries == 'infer': + return DirectoryPartitioning.discover(schema=schema) return DirectoryPartitioning(schema, dictionaries) elif field_names is not None: if isinstance(field_names, list): @@ -181,6 +206,8 @@ def partitioning(schema=None, field_names=None, flavor=None, raise ValueError("Cannot specify 'field_names' for flavor 'hive'") elif schema is not None: if isinstance(schema, pa.Schema): + if dictionaries == 'infer': + return HivePartitioning.discover(schema=schema) return HivePartitioning(schema, dictionaries) else: raise ValueError( diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bbe545cf794..8eb2d0f1e7a 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -268,11 +268,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": c_bool infer_dictionary + shared_ptr[CSchema] schema cdef cppclass CHivePartitioningFactoryOptions \ "arrow::dataset::HivePartitioningFactoryOptions": c_bool infer_dictionary, c_string null_fallback + shared_ptr[CSchema] schema cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 09cad5d917b..7c544af075c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1313,6 +1313,8 @@ def test_partitioning_function(): # default DirectoryPartitioning part = ds.partitioning(schema) assert isinstance(part, ds.DirectoryPartitioning) + part = ds.partitioning(schema, dictionaries="infer") + assert isinstance(part, ds.PartitioningFactory) part = ds.partitioning(field_names=names) assert isinstance(part, ds.PartitioningFactory) # needs schema or list of names @@ -1326,6 +1328,8 @@ def test_partitioning_function(): # Hive partitioning part = ds.partitioning(schema, flavor="hive") assert isinstance(part, ds.HivePartitioning) + part = ds.partitioning(schema, dictionaries="infer", flavor="hive") + assert isinstance(part, ds.PartitioningFactory) part = ds.partitioning(flavor="hive") assert isinstance(part, ds.PartitioningFactory) # cannot pass list of names @@ -1339,6 +1343,52 @@ def test_partitioning_function(): ds.partitioning(schema, flavor="unsupported") +def test_directory_partitioning_dictionary_key(mockfs): + # ARROW-8088 specifying partition key as dictionary type + schema = pa.schema([ + pa.field('group', pa.dictionary(pa.int8(), pa.int32())), + pa.field('key', pa.dictionary(pa.int8(), pa.string())) + ]) + part = ds.DirectoryPartitioning.discover(schema=schema) + + dataset = ds.dataset( + "subdir", format="parquet", filesystem=mockfs, partitioning=part + ) + table = dataset.to_table() + + assert table.column('group').type.equals(schema.types[0]) + assert table.column('group').to_pylist() == [1] * 5 + [2] * 5 + assert table.column('key').type.equals(schema.types[1]) + assert table.column('key').to_pylist() == ['xxx'] * 5 + ['yyy'] * 5 + + +def test_hive_partitioning_dictionary_key(multisourcefs): + # ARROW-8088 specifying partition key as dictionary type + schema = pa.schema([ + pa.field('year', pa.dictionary(pa.int8(), pa.int16())), + pa.field('month', pa.dictionary(pa.int8(), pa.int16())) + ]) + part = ds.HivePartitioning.discover(schema=schema) + + dataset = ds.dataset( + "hive", format="parquet", filesystem=multisourcefs, partitioning=part + ) + table = dataset.to_table() + + year_dictionary = list(range(2006, 2011)) + month_dictionary = list(range(1, 13)) + assert table.column('year').type.equals(schema.types[0]) + for chunk in table.column('year').chunks: + actual = chunk.dictionary.to_pylist() + actual.sort() + assert actual == year_dictionary + assert table.column('month').type.equals(schema.types[1]) + for chunk in table.column('month').chunks: + actual = chunk.dictionary.to_pylist() + actual.sort() + assert actual == month_dictionary + + def _create_single_file(base_dir, table=None, row_group_size=None): import pyarrow.parquet as pq if table is None: