Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ Result<Expression> KeyValuePartitioning::ConvertKey(const Key& key) const {
DictionaryScalar::ValueType value;
value.dictionary = dictionaries_[field_index];

if (!value.dictionary->type()->Equals(
checked_cast<const DictionaryType&>(*field->type()).value_type())) {
const auto& dictionary_type = checked_cast<const DictionaryType&>(*field->type());
if (!value.dictionary->type()->Equals(dictionary_type.value_type())) {
return Status::TypeError("Dictionary supplied for field ", field->ToString(),
" had incorrect type ",
value.dictionary->type()->ToString());
Expand All @@ -155,6 +155,8 @@ Result<Expression> KeyValuePartitioning::ConvertKey(const Key& key) const {
// look up the partition value in the dictionary
ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), *key.value));
ARROW_ASSIGN_OR_RAISE(auto index, compute::IndexIn(converted, value.dictionary));
auto to_index_type = compute::CastOptions::Safe(dictionary_type.index_type());
ARROW_ASSIGN_OR_RAISE(index, compute::Cast(index, to_index_type));
value.index = index.scalar();
if (!value.index->is_valid) {
return Status::Invalid("Dictionary supplied for field ", field->ToString(),
Expand Down Expand Up @@ -300,10 +302,18 @@ class KeyValuePartitioningFactory : public PartitioningFactory {
return repr_memos_[index]->GetOrInsert<StringType>(repr, &dummy);
}

Result<std::shared_ptr<Schema>> DoInpsect() {
Result<std::shared_ptr<Schema>> DoInspect() {
dictionaries_.assign(name_to_index_.size(), nullptr);

std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());
if (options_.schema) {
const auto requested_size = options_.schema->fields().size();
const auto inferred_size = fields.size();
if (inferred_size != requested_size) {
return Status::Invalid("Requested schema has ", requested_size,
" fields, but only ", inferred_size, " were detected");
}
}

for (const auto& name_index : name_to_index_) {
const auto& name = name_index.first;
Expand All @@ -317,15 +327,34 @@ class KeyValuePartitioningFactory : public PartitioningFactory {
"'; couldn't infer type");
}

// try casting to int32, otherwise bail and just use the string reprs
auto dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array();
auto type = dict->type();
if (options_.infer_dictionary) {
// wrap the inferred type in dictionary()
type = dictionary(int32(), std::move(type));
std::shared_ptr<Field> current_field;
std::shared_ptr<Array> dict;
if (options_.schema) {
// if we have a schema, use the schema type.
current_field = options_.schema->field(index);
auto cast_target = current_field->type();
if (is_dictionary(cast_target->id())) {
cast_target = checked_pointer_cast<DictionaryType>(cast_target)->value_type();
}
auto maybe_dict = compute::Cast(reprs, cast_target);
if (!maybe_dict.ok()) {
return Status::Invalid("Could not cast segments for partition field ",
current_field->name(), " to requested type ",
current_field->type()->ToString(),
" because: ", maybe_dict.status());
}
dict = maybe_dict.ValueOrDie().make_array();
} else {
// try casting to int32, otherwise bail and just use the string reprs
dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array();
auto type = dict->type();
if (options_.infer_dictionary) {
// wrap the inferred type in dictionary()
type = dictionary(int32(), std::move(type));
}
current_field = field(name, std::move(type));
}

fields[index] = field(name, std::move(type));
fields[index] = std::move(current_field);
dictionaries_[index] = std::move(dict);
}

Expand Down Expand Up @@ -379,7 +408,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory {
}
}

return DoInpsect();
return DoInspect();
}

Result<std::shared_ptr<Partitioning>> Finish(
Expand Down Expand Up @@ -480,7 +509,7 @@ class HivePartitioningFactory : public KeyValuePartitioningFactory {
}

field_names_ = FieldNames();
return DoInpsect();
return DoInspect();
}

Result<std::shared_ptr<Partitioning>> Finish(
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ struct PartitioningFactoryOptions {
/// columns, and Expressions parsed by the finished Partitioning will include
/// dictionaries of all unique inspected values for each field.
bool infer_dictionary = false;
/// Optionally, an expected schema can be provided, in which case inference
/// will only check discovered fields against the schema and update internal
/// state (such as dictionaries).
std::shared_ptr<Schema> schema;
};

struct HivePartitioningFactoryOptions : PartitioningFactoryOptions {
Expand Down
74 changes: 74 additions & 0 deletions cpp/src/arrow/dataset/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,19 @@ TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionary) {
AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello");
}

TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionaryCustomIndex) {
// Make sure a non-int32 index type is properly cast to, else we fail a CHECK when
// we construct a dictionary array with the wrong index type
auto dict_type = dictionary(int8(), utf8());
auto dictionary = ArrayFromJSON(utf8(), R"(["hello", "world"])");
partitioning_ = std::make_shared<DirectoryPartitioning>(
schema({field("alpha", dict_type)}), ArrayVector{dictionary});
written_schema_ = partitioning_->schema();

ASSERT_OK_AND_ASSIGN(auto dict_hello, MakeScalar("hello")->CastTo(dict_type));
AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello");
}

TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) {
for (auto temporal : {timestamp(TimeUnit::SECOND), date32()}) {
partitioning_ = std::make_shared<DirectoryPartitioning>(
Expand Down Expand Up @@ -464,6 +477,67 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) {
AssertParseError("/alpha=yosemite"); // not in inspected dictionary
}

TEST_F(TestPartitioning, ExistingSchemaDirectory) {
// Infer dictionary values but with a given schema
auto dict_type = dictionary(int8(), utf8());
PartitioningFactoryOptions options;
options.schema = schema({field("alpha", int64()), field("beta", dict_type)});
factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options);

AssertInspect({"/0/1"}, options.schema->fields());
AssertInspect({"/0/1/what"}, options.schema->fields());

// fail if any segment is not parseable as schema type
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("Failed to parse string"),
factory_->Inspect({"/0/1", "/hello/1"}));
factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options);

// Now we don't fail since our type is large enough
AssertInspect({"/3760212050/1"}, options.schema->fields());
// If there are still too many digits, fail
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("Failed to parse string"),
factory_->Inspect({"/1038581385102940193760212050/1"}));
factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options);

AssertInspect({"/0/1", "/2"}, options.schema->fields());
}

TEST_F(TestPartitioning, ExistingSchemaHive) {
// Infer dictionary values but with a given schema
auto dict_type = dictionary(int8(), utf8());
HivePartitioningFactoryOptions options;
options.schema = schema({field("a", int64()), field("b", dict_type)});
factory_ = HivePartitioning::MakeFactory(options);

AssertInspect({"/a=0/b=1"}, options.schema->fields());
AssertInspect({"/a=0/b=1/what"}, options.schema->fields());
AssertInspect({"/a=0", "/b=1"}, options.schema->fields());

// fail if any segment for field alpha is not parseable as schema type
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
::testing::HasSubstr(
"Could not cast segments for partition field a to requested type int64"),
factory_->Inspect({"/a=0/b=1", "/a=hello/b=1"}));
factory_ = HivePartitioning::MakeFactory(options);

EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
::testing::HasSubstr("Requested schema has 2 fields, but only 1 were detected"),
factory_->Inspect({"/a=0", "/a=hello"}));
factory_ = HivePartitioning::MakeFactory(options);

// Now we don't fail since our type is large enough
AssertInspect({"/a=3760212050/b=1"}, options.schema->fields());
// If there are still too many digits, fail
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("Failed to parse string"),
factory_->Inspect({"/a=1038581385102940193760212050/b=1"}));
factory_ = HivePartitioning::MakeFactory(options);

AssertInspect({"/a=0/b=1", "/b=2"}, options.schema->fields());
}

TEST_F(TestPartitioning, EtlThenHive) {
FieldVector etl_fields{field("year", int16()), field("month", int8()),
field("day", int8()), field("hour", int8())};
Expand Down
30 changes: 26 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1528,15 +1528,17 @@ cdef class DirectoryPartitioning(Partitioning):
self.directory_partitioning = <CDirectoryPartitioning*> sp.get()

@staticmethod
def discover(field_names, infer_dictionary=False,
max_partition_dictionary_size=0):
def discover(field_names=None, infer_dictionary=False,
max_partition_dictionary_size=0,
schema=None):
"""
Discover a DirectoryPartitioning.

Parameters
----------
field_names : list of str
The names to associate with the values from the subdirectory names.
If schema is given, will be populated from the schema.
infer_dictionary : bool, default False
When inferring a schema for partition fields, yield dictionary
encoded types instead of plain types. This can be more efficient
Expand All @@ -1547,6 +1549,10 @@ cdef class DirectoryPartitioning(Partitioning):
Synonymous with infer_dictionary for backwards compatibility with
1.0: setting this to -1 or None is equivalent to passing
infer_dictionary=True.
schema : Schema, default None
Use this schema instead of inferring a schema from partition
values. Partition values will be validated against this schema
before accumulation into the Partitioning's dictionary.

Returns
-------
Expand All @@ -1566,7 +1572,15 @@ cdef class DirectoryPartitioning(Partitioning):
if infer_dictionary:
c_options.infer_dictionary = True

c_field_names = [tobytes(s) for s in field_names]
if schema:
c_options.schema = pyarrow_unwrap_schema(schema)
c_field_names = [tobytes(f.name) for f in schema]
elif not field_names:
raise ValueError(
"Neither field_names nor schema was passed; "
"cannot infer field_names")
else:
c_field_names = [tobytes(s) for s in field_names]
return PartitioningFactory.wrap(
CDirectoryPartitioning.MakeFactory(c_field_names, c_options))

Expand Down Expand Up @@ -1637,7 +1651,8 @@ cdef class HivePartitioning(Partitioning):
@staticmethod
def discover(infer_dictionary=False,
max_partition_dictionary_size=0,
null_fallback="__HIVE_DEFAULT_PARTITION__"):
null_fallback="__HIVE_DEFAULT_PARTITION__",
schema=None):
"""
Discover a HivePartitioning.

Expand All @@ -1657,6 +1672,10 @@ cdef class HivePartitioning(Partitioning):
When inferring a schema for partition fields this value will be
replaced by null. The default is set to __HIVE_DEFAULT_PARTITION__
for compatibility with Spark
schema : Schema, default None
Use this schema instead of inferring a schema from partition
values. Partition values will be validated against this schema
before accumulation into the Partitioning's dictionary.

Returns
-------
Expand All @@ -1677,6 +1696,9 @@ cdef class HivePartitioning(Partitioning):

c_options.null_fallback = tobytes(null_fallback)

if schema:
c_options.schema = pyarrow_unwrap_schema(schema)

return PartitioningFactory.wrap(
CHivePartitioning.MakeFactory(c_options))

Expand Down
31 changes: 29 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ def partitioning(schema=None, field_names=None, flavor=None,
flavor : str, default None
The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
a HivePartitioning.
dictionaries : List[Array]
dictionaries : Dict[str, Array]
If the type of any field of `schema` is a dictionary type, the
corresponding entry of `dictionaries` must be an array containing
every value which may be taken by the corresponding column or an
error will be raised in parsing.
error will be raised in parsing. Alternatively, pass `infer` to have
Arrow discover the dictionary values, in which case a
PartitioningFactory is returned.

Returns
-------
Expand All @@ -146,6 +148,27 @@ def partitioning(schema=None, field_names=None, flavor=None,
For paths like "/2009/June", the year will be inferred as int32 while month
will be inferred as string.

Specify a Schema with dictionary encoding, providing dictionary values:

>>> partitioning(
... pa.schema([
... ("year", pa.int16()),
... ("month", pa.dictionary(pa.int8(), pa.string()))
... ]),
... dictionaries={
... "month": pa.array(["January", "February", "March"]),
... })

Alternatively, specify a Schema with dictionary encoding, but have Arrow
infer the dictionary values:

>>> partitioning(
... pa.schema([
... ("year", pa.int16()),
... ("month", pa.dictionary(pa.int8(), pa.string()))
... ]),
... dictionaries="infer")

Create a Hive scheme for a path like "/year=2009/month=11":

>>> partitioning(
Expand All @@ -164,6 +187,8 @@ def partitioning(schema=None, field_names=None, flavor=None,
if field_names is not None:
raise ValueError(
"Cannot specify both 'schema' and 'field_names'")
if dictionaries == 'infer':
return DirectoryPartitioning.discover(schema=schema)
return DirectoryPartitioning(schema, dictionaries)
elif field_names is not None:
if isinstance(field_names, list):
Expand All @@ -181,6 +206,8 @@ def partitioning(schema=None, field_names=None, flavor=None,
raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
elif schema is not None:
if isinstance(schema, pa.Schema):
if dictionaries == 'infer':
return HivePartitioning.discover(schema=schema)
return HivePartitioning(schema, dictionaries)
else:
raise ValueError(
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CPartitioningFactoryOptions \
"arrow::dataset::PartitioningFactoryOptions":
c_bool infer_dictionary
shared_ptr[CSchema] schema

cdef cppclass CHivePartitioningFactoryOptions \
"arrow::dataset::HivePartitioningFactoryOptions":
c_bool infer_dictionary,
c_string null_fallback
shared_ptr[CSchema] schema

cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory":
pass
Expand Down
Loading