diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/dataset/expression.cc index 296fb221528..e7acac3600b 100644 --- a/cpp/src/arrow/dataset/expression.cc +++ b/cpp/src/arrow/dataset/expression.cc @@ -90,27 +90,35 @@ ValueDescr Expression::descr() const { return CallNotNull(*this)->descr; } +namespace { + +std::string PrintDatum(const Datum& datum) { + if (datum.is_scalar()) { + switch (datum.type()->id()) { + case Type::STRING: + case Type::LARGE_STRING: + return '"' + + Escape(util::string_view(*datum.scalar_as().value)) + + '"'; + + case Type::BINARY: + case Type::FIXED_SIZE_BINARY: + case Type::LARGE_BINARY: + return '"' + datum.scalar_as().value->ToHexString() + '"'; + + default: + break; + } + return datum.scalar()->ToString(); + } + return datum.ToString(); +} + +} // namespace + std::string Expression::ToString() const { if (auto lit = literal()) { - if (lit->is_scalar()) { - switch (lit->type()->id()) { - case Type::STRING: - case Type::LARGE_STRING: - return '"' + - Escape(util::string_view(*lit->scalar_as().value)) + - '"'; - - case Type::BINARY: - case Type::FIXED_SIZE_BINARY: - case Type::LARGE_BINARY: - return '"' + lit->scalar_as().value->ToHexString() + '"'; - - default: - break; - } - return lit->scalar()->ToString(); - } - return lit->ToString(); + return PrintDatum(*lit); } if (auto ref = field_ref()) { @@ -763,16 +771,7 @@ Status ExtractKnownFieldValuesImpl( auto ref = call->arguments[0].field_ref(); auto lit = call->arguments[1].literal(); - auto it_success = known_values->emplace(*ref, *lit); - if (it_success.second) continue; - - // A value was already known for ref; check it - auto ref_lit = it_success.first; - if (*lit != ref_lit->second) { - return Status::Invalid("Conflicting guarantees: (", ref->ToString(), - " == ", lit->ToString(), ") vs (", ref->ToString(), - " == ", ref_lit->second.ToString()); - } + known_values->emplace(*ref, *lit); } conjunction_members->erase(unconsumed_end, conjunction_members->end()); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2c437ce8eec..612c249861c 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -313,12 +313,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); batch.reset(); // drop to hopefully conserve memory + if (groups.batches.size() > static_cast(write_options.max_partitions)) { + return Status::Invalid("Fragment would be written into ", groups.batches.size(), + " partitions. This exceeds the maximum of ", + write_options.max_partitions); + } + std::unordered_set need_flushed; for (size_t i = 0; i < groups.batches.size(); ++i) { - ARROW_ASSIGN_OR_RAISE( - auto partition_expression, - and_(std::move(groups.expressions[i]), fragment->partition_expression()) - .Bind(*scanner->schema())); + auto partition_expression = + and_(std::move(groups.expressions[i]), fragment->partition_expression()); auto batch = std::move(groups.batches[i]); ARROW_ASSIGN_OR_RAISE(auto part, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index bb2aa86ba9b..708f7e02054 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -295,6 +295,9 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Partitioning used to generate fragment paths. std::shared_ptr partitioning; + /// Maximum number of partitions any batch may be written into, default is 1K. + int max_partitions = 1024; + /// Template string used to generate fragment basenames. /// {i} will be replaced by an auto incremented integer. std::string basename_template; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 42421134790..b8428e0a98e 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -223,6 +223,18 @@ TEST_F(TestIpcFileSystemDataset, WriteWithEmptyPartitioningSchema) { TestWriteWithEmptyPartitioningSchema(); } +TEST_F(TestIpcFileSystemDataset, WriteExceedsMaxPartitions) { + write_options_.partitioning = std::make_shared( + SchemaFromColumnNames(source_schema_, {"model"})); + + // require that no batch be grouped into more than 2 written batches: + write_options_.max_partitions = 2; + + auto scanner = std::make_shared(dataset_, scan_options_, scan_context_); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("This exceeds the maximum"), + FileSystemDataset::Write(write_options_, scanner)); +} + TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) { std::shared_ptr buf = std::make_shared(util::string_view("")); auto result = format_->Inspect(FileSource(buf)); diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 3a164d8d795..d6a3723d055 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -23,6 +23,7 @@ #include #include "arrow/array/array_base.h" +#include "arrow/array/array_dict.h" #include "arrow/array/array_nested.h" #include "arrow/array/builder_dict.h" #include "arrow/compute/api_scalar.h" @@ -191,7 +192,7 @@ Result KeyValuePartitioning::Parse(const std::string& path) const { } Result KeyValuePartitioning::Format(const Expression& expr) const { - std::vector values{static_cast(schema_->num_fields()), nullptr}; + ScalarVector values{static_cast(schema_->num_fields()), nullptr}; ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr)); for (const auto& ref_value : known_values) { @@ -202,7 +203,7 @@ Result KeyValuePartitioning::Format(const Expression& expr) const { ARROW_ASSIGN_OR_RAISE(auto match, ref_value.first.FindOneOrNone(*schema_)); if (match.empty()) continue; - const auto& value = ref_value.second.scalar(); + auto value = ref_value.second.scalar(); const auto& field = schema_->field(match[0]); if (!value->type->Equals(field->type())) { @@ -210,7 +211,12 @@ Result KeyValuePartitioning::Format(const Expression& expr) const { ") is invalid for ", field->ToString()); } - values[match[0]] = value.get(); + if (value->type->id() == Type::DICTIONARY) { + ARROW_ASSIGN_OR_RAISE( + value, checked_cast(*value).GetEncodedValue()); + } + + values[match[0]] = std::move(value); } return FormatValues(values); @@ -230,9 +236,9 @@ std::vector DirectoryPartitioning::ParseKeys( return keys; } -inline util::optional NextValid(const std::vector& values, int first_null) { +inline util::optional NextValid(const ScalarVector& values, int first_null) { auto it = std::find_if(values.begin() + first_null + 1, values.end(), - [](Scalar* v) { return v != nullptr; }); + [](const std::shared_ptr& v) { return v != nullptr; }); if (it == values.end()) { return util::nullopt; @@ -242,7 +248,7 @@ inline util::optional NextValid(const std::vector& values, int fir } Result DirectoryPartitioning::FormatValues( - const std::vector& values) const { + const ScalarVector& values) const { std::vector segments(static_cast(schema_->num_fields())); for (int i = 0; i < schema_->num_fields(); ++i) { @@ -426,8 +432,7 @@ std::vector HivePartitioning::ParseKeys( return keys; } -Result HivePartitioning::FormatValues( - const std::vector& values) const { +Result HivePartitioning::FormatValues(const ScalarVector& values) const { std::vector segments(static_cast(schema_->num_fields())); for (int i = 0; i < schema_->num_fields(); ++i) { @@ -532,19 +537,21 @@ Result> PartitioningOrFactory::GetOrInferSchema( // Transform an array of counts to offsets which will divide a ListArray // into an equal number of slices with corresponding lengths. -inline Result> CountsToOffsets( +inline Result> CountsToOffsets( std::shared_ptr counts) { - Int32Builder offset_builder; + TypedBufferBuilder offset_builder; RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1)); - offset_builder.UnsafeAppend(0); + + int32_t current_offset = 0; + offset_builder.UnsafeAppend(current_offset); for (int64_t i = 0; i < counts->length(); ++i) { DCHECK_NE(counts->Value(i), 0); - auto next_offset = static_cast(offset_builder[i] + counts->Value(i)); - offset_builder.UnsafeAppend(next_offset); + current_offset += static_cast(counts->Value(i)); + offset_builder.UnsafeAppend(current_offset); } - std::shared_ptr offsets; + std::shared_ptr offsets; RETURN_NOT_OK(offset_builder.Finish(&offsets)); return offsets; } @@ -604,6 +611,12 @@ class StructDictionary { RETURN_NOT_OK(builders[i].FinishInternal(&indices)); ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices)); + + if (fields[i]->type()->id() == Type::DICTIONARY) { + RETURN_NOT_OK(RestoreDictionaryEncoding( + checked_pointer_cast(fields[i]->type()), &column)); + } + columns[i] = column.make_array(); } @@ -612,27 +625,22 @@ class StructDictionary { private: Status AddOne(Datum column, std::shared_ptr* fused_indices) { - ArrayData* encoded; if (column.type()->id() != Type::DICTIONARY) { - ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column)); + ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(std::move(column))); } - encoded = column.mutable_array(); - - auto indices = - std::make_shared(encoded->length, std::move(encoded->buffers[1])); - dictionaries_.push_back(MakeArray(std::move(encoded->dictionary))); - auto dictionary_size = static_cast(dictionaries_.back()->length()); + auto dict_column = column.array_as(); + dictionaries_.push_back(dict_column->dictionary()); + ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), int32())); if (*fused_indices == nullptr) { - *fused_indices = std::move(indices); - size_ = dictionary_size; - return Status::OK(); + *fused_indices = checked_pointer_cast(std::move(indices)); + return IncreaseSize(); } // It's useful to think about the case where each of dictionaries_ has size 10. // In this case the decimal digit in the ones place is the code in dictionaries_[0], - // the tens place corresponds to dictionaries_[1], etc. + // the tens place corresponds to the code in dictionaries_[1], etc. // The incumbent indices must be shifted to the hundreds place so as not to collide. ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices, compute::Multiply(indices, MakeScalar(size_))); @@ -641,10 +649,7 @@ class StructDictionary { compute::Add(new_fused_indices, *fused_indices)); *fused_indices = checked_pointer_cast(new_fused_indices.make_array()); - - // XXX should probably cap this at 2**15 or so - ARROW_CHECK(!internal::MultiplyWithOverflow(size_, dictionary_size, &size_)); - return Status::OK(); + return IncreaseSize(); } // expand a fused code into component dict codes, order is in order of addition @@ -656,13 +661,48 @@ class StructDictionary { } } - int32_t size_; + Status RestoreDictionaryEncoding(std::shared_ptr expected_type, + Datum* column) { + DCHECK_NE(column->type()->id(), Type::DICTIONARY); + ARROW_ASSIGN_OR_RAISE(*column, compute::DictionaryEncode(std::move(*column))); + + if (expected_type->index_type()->id() == Type::INT32) { + // dictionary_encode has already yielded the expected index_type + return Status::OK(); + } + + // cast the indices to the expected index type + auto dictionary = std::move(column->mutable_array()->dictionary); + column->mutable_array()->type = int32(); + + ARROW_ASSIGN_OR_RAISE(*column, + compute::Cast(std::move(*column), expected_type->index_type())); + + column->mutable_array()->dictionary = std::move(dictionary); + column->mutable_array()->type = expected_type; + return Status::OK(); + } + + Status IncreaseSize() { + auto factor = static_cast(dictionaries_.back()->length()); + + if (internal::MultiplyWithOverflow(size_, factor, &size_)) { + return Status::CapacityError("Max groups exceeded"); + } + return Status::OK(); + } + + int32_t size_ = 1; ArrayVector dictionaries_; }; Result> MakeGroupings(const StructArray& by) { if (by.num_fields() == 0) { - return Status::NotImplemented("Grouping with no criteria"); + return Status::Invalid("Grouping with no criteria"); + } + + if (by.null_count() != 0) { + return Status::Invalid("Grouping with null criteria"); } ARROW_ASSIGN_OR_RAISE(auto fused, StructDictionary::Encode(by.fields())); @@ -685,8 +725,9 @@ Result> MakeGroupings(const StructArray& by) { checked_pointer_cast(fused_counts_and_values->GetFieldByName("counts")); ARROW_ASSIGN_OR_RAISE(auto offsets, CountsToOffsets(std::move(counts))); - ARROW_ASSIGN_OR_RAISE(auto grouped_sort_indices, - ListArray::FromArrays(*offsets, *sort_indices)); + auto grouped_sort_indices = + std::make_shared(list(sort_indices->type()), unique_rows->length(), + std::move(offsets), std::move(sort_indices)); return StructArray::Make( ArrayVector{std::move(unique_rows), std::move(grouped_sort_indices)}, diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 8975f565b19..944434e64f7 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -142,7 +142,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { virtual std::vector ParseKeys(const std::string& path) const = 0; - virtual Result FormatValues(const std::vector& values) const = 0; + virtual Result FormatValues(const ScalarVector& values) const = 0; /// Convert a Key to a full expression. Result ConvertKey(const Key& key) const; @@ -172,7 +172,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { private: std::vector ParseKeys(const std::string& path) const override; - Result FormatValues(const std::vector& values) const override; + Result FormatValues(const ScalarVector& values) const override; }; /// \brief Multi-level, directory based partitioning @@ -201,7 +201,7 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { private: std::vector ParseKeys(const std::string& path) const override; - Result FormatValues(const std::vector& values) const override; + Result FormatValues(const ScalarVector& values) const override; }; /// \brief Implementation provided by lambda or other callable @@ -288,16 +288,58 @@ class ARROW_DS_EXPORT PartitioningOrFactory { /// \brief Assemble lists of indices of identical rows. /// /// \param[in] by A StructArray whose columns will be used as grouping criteria. -/// \return A StructArray mapping unique rows (in field "values", represented as a -/// StructArray with the same fields as `by`) to lists of indices where -/// that row appears (in field "groupings"). +/// Top level nulls are invalid, as are empty criteria (no grouping +/// columns). +/// \return A array of type `struct>`, +/// which is a mapping from unique rows (field "values") to lists of +/// indices into `by` where that row appears (field "groupings"). +/// +/// For example, +/// MakeGroupings([ +/// {"a": "ex", "b": 0}, +/// {"a": "ex", "b": 0}, +/// {"a": "why", "b": 0}, +/// {"a": "why", "b": 0}, +/// {"a": "ex", "b": 0}, +/// {"a": "why", "b": 1} +/// ]) == [ +/// {"values": {"a": "ex", "b": 0}, "groupings": [0, 1, 4]}, +/// {"values": {"a": "why", "b": 0}, "groupings": [2, 3]}, +/// {"values": {"a": "why", "b": 1}, "groupings": [5]} +/// ] ARROW_DS_EXPORT Result> MakeGroupings(const StructArray& by); -/// \brief Produce slices of an Array which correspond to the provided groupings. +/// \brief Produce a ListArray whose slots are selections of `array` which correspond to +/// the provided groupings. +/// +/// For example, +/// ApplyGroupings([[0, 1, 4], [2, 3], [5]], [ +/// {"a": "ex", "b": 0, "passenger": 0}, +/// {"a": "ex", "b": 0, "passenger": 1}, +/// {"a": "why", "b": 0, "passenger": 2}, +/// {"a": "why", "b": 0, "passenger": 3}, +/// {"a": "ex", "b": 0, "passenger": 4}, +/// {"a": "why", "b": 1, "passenger": 5} +/// ]) == [ +/// [ +/// {"a": "ex", "b": 0, "passenger": 0}, +/// {"a": "ex", "b": 0, "passenger": 1}, +/// {"a": "ex", "b": 0, "passenger": 4}, +/// ], +/// [ +/// {"a": "why", "b": 0, "passenger": 2}, +/// {"a": "why", "b": 0, "passenger": 3}, +/// ], +/// [ +/// {"a": "why", "b": 1, "passenger": 5} +/// ] +/// ] ARROW_DS_EXPORT Result> ApplyGroupings(const ListArray& groupings, const Array& array); + +/// \brief Produce selections of a RecordBatch which correspond to the provided groupings. ARROW_DS_EXPORT Result ApplyGroupings(const ListArray& groupings, const std::shared_ptr& batch); diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 2260eb219da..286848d9ae9 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -32,6 +32,7 @@ #include "arrow/filesystem/path_util.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/range.h" namespace arrow { using internal::checked_pointer_cast; @@ -152,6 +153,16 @@ TEST_F(TestPartitioning, DirectoryPartitioningFormat) { equal(field_ref("beta"), literal("hello")))); } +TEST_F(TestPartitioning, DirectoryPartitioningFormatDictionary) { + auto dictionary = ArrayFromJSON(utf8(), R"(["hello", "world"])"); + partitioning_ = std::make_shared(schema({DictStr("alpha")}), + ArrayVector{dictionary}); + written_schema_ = partitioning_->schema(); + + ASSERT_OK_AND_ASSIGN(auto dict_hello, MakeScalar("hello")->CastTo(DictStr("")->type())); + AssertFormat(equal(field_ref("alpha"), literal(dict_hello)), "hello"); +} + TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) { for (auto temporal : {timestamp(TimeUnit::SECOND), date32()}) { partitioning_ = std::make_shared( @@ -549,12 +560,14 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json, })); ASSERT_OK_AND_ASSIGN(auto groupings_and_values, MakeGroupings(*by)); + ASSERT_OK(groupings_and_values->ValidateFull()); auto groupings = checked_pointer_cast(groupings_and_values->GetFieldByName("groupings")); ASSERT_OK_AND_ASSIGN(std::shared_ptr grouped_ids, ApplyGroupings(*groupings, *batch->GetColumnByName("id"))); + ASSERT_OK(grouped_ids->ValidateFull()); ArrayVector columns = checked_cast(*groupings_and_values->GetFieldByName("values")) @@ -562,6 +575,7 @@ void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json, columns.push_back(grouped_ids); ASSERT_OK_AND_ASSIGN(auto actual, StructArray::Make(columns, fields_with_ids)); + ASSERT_OK(actual->ValidateFull()); AssertArraysEqual(*expected, *actual, /*verbose=*/true); } @@ -585,5 +599,52 @@ TEST(GroupTest, Basics) { ])"); } +TEST(GroupTest, WithNulls) { + auto has_nulls = checked_pointer_cast( + ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([ + {"a": "ex", "b": 0}, + {"a": null, "b": 0}, + {"a": "why", "b": 0}, + {"a": "ex", "b": 1}, + {"a": "why", "b": 0}, + {"a": "ex", "b": 1}, + {"a": "ex", "b": 0}, + {"a": "why", "b": null} + ])")); + ASSERT_RAISES(NotImplemented, MakeGroupings(*has_nulls)); + + has_nulls = checked_pointer_cast( + ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([ + {"a": "ex", "b": 0}, + null, + {"a": "why", "b": 0}, + {"a": "ex", "b": 1}, + {"a": "why", "b": 0}, + {"a": "ex", "b": 1}, + {"a": "ex", "b": 0}, + null + ])")); + ASSERT_RAISES(Invalid, MakeGroupings(*has_nulls)); +} + +TEST(GroupTest, GroupOnDictionary) { + AssertGrouping({field("a", dictionary(int32(), utf8())), field("b", int32())}, R"([ + {"a": "ex", "b": 0, "id": 0}, + {"a": "ex", "b": 0, "id": 1}, + {"a": "why", "b": 0, "id": 2}, + {"a": "ex", "b": 1, "id": 3}, + {"a": "why", "b": 0, "id": 4}, + {"a": "ex", "b": 1, "id": 5}, + {"a": "ex", "b": 0, "id": 6}, + {"a": "why", "b": 1, "id": 7} + ])", + R"([ + {"a": "ex", "b": 0, "ids": [0, 1, 6]}, + {"a": "why", "b": 0, "ids": [2, 4]}, + {"a": "ex", "b": 1, "ids": [3, 5]}, + {"a": "why", "b": 1, "ids": [7]} + ])"); +} + } // namespace dataset } // namespace arrow diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 410ca12c66b..151ae812c30 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1403,6 +1403,25 @@ cdef class PartitioningFactory(_Weakrefable): return self.wrapped +cdef vector[shared_ptr[CArray]] _partitioning_dictionaries( + Schema schema, dictionaries) except *: + cdef: + vector[shared_ptr[CArray]] c_dictionaries + + dictionaries = dictionaries or {} + + for field in schema: + dictionary = dictionaries.get(field.name) + + if (isinstance(field.type, pa.DictionaryType) and + dictionary is not None): + c_dictionaries.push_back(pyarrow_unwrap_array(dictionary)) + else: + c_dictionaries.push_back( nullptr) + + return c_dictionaries + + cdef class DirectoryPartitioning(Partitioning): """ A Partitioning based on a specified Schema. @@ -1416,6 +1435,11 @@ cdef class DirectoryPartitioning(Partitioning): ---------- schema : Schema The schema that describes the partitions present in the file path. + dictionaries : Dict[str, Array] + If the type of any field of `schema` is a dictionary type, the + corresponding entry of `dictionaries` must be an array containing + every value which may be taken by the corresponding column or an + error will be raised in parsing. Returns ------- @@ -1433,12 +1457,15 @@ cdef class DirectoryPartitioning(Partitioning): cdef: CDirectoryPartitioning* directory_partitioning - def __init__(self, Schema schema not None): - cdef shared_ptr[CDirectoryPartitioning] partitioning - partitioning = make_shared[CDirectoryPartitioning]( - pyarrow_unwrap_schema(schema) + def __init__(self, Schema schema not None, dictionaries=None): + cdef: + shared_ptr[CDirectoryPartitioning] c_partitioning + + c_partitioning = make_shared[CDirectoryPartitioning]( + pyarrow_unwrap_schema(schema), + _partitioning_dictionaries(schema, dictionaries) ) - self.init( partitioning) + self.init( c_partitioning) cdef init(self, const shared_ptr[CPartitioning]& sp): Partitioning.init(self, sp) @@ -1506,6 +1533,11 @@ cdef class HivePartitioning(Partitioning): ---------- schema : Schema The schema that describes the partitions present in the file path. + dictionaries : Dict[str, Array] + If the type of any field of `schema` is a dictionary type, the + corresponding entry of `dictionaries` must be an array containing + every value which may be taken by the corresponding column or an + error will be raised in parsing. Returns ------- @@ -1524,12 +1556,15 @@ cdef class HivePartitioning(Partitioning): cdef: CHivePartitioning* hive_partitioning - def __init__(self, Schema schema not None): - cdef shared_ptr[CHivePartitioning] partitioning - partitioning = make_shared[CHivePartitioning]( - pyarrow_unwrap_schema(schema) + def __init__(self, Schema schema not None, dictionaries=None): + cdef: + shared_ptr[CHivePartitioning] c_partitioning + + c_partitioning = make_shared[CHivePartitioning]( + pyarrow_unwrap_schema(schema), + _partitioning_dictionaries(schema, dictionaries) ) - self.init( partitioning) + self.init( c_partitioning) cdef init(self, const shared_ptr[CPartitioning]& sp): Partitioning.init(self, sp) @@ -2270,6 +2305,7 @@ def _filesystemdataset_write( Schema schema not None, FileSystem filesystem not None, Partitioning partitioning not None, FileWriteOptions file_options not None, bint use_threads, + int max_partitions, ): """ CFileSystemDataset.Write wrapper @@ -2283,6 +2319,7 @@ def _filesystemdataset_write( c_options.filesystem = filesystem.unwrap() c_options.base_dir = tobytes(_stringify_path(base_dir)) c_options.partitioning = partitioning.unwrap() + c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) if isinstance(data, Dataset): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index adeb8aefd73..a7aa9c47299 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -88,7 +88,8 @@ def scalar(value): return Expression._scalar(value) -def partitioning(schema=None, field_names=None, flavor=None): +def partitioning(schema=None, field_names=None, flavor=None, + dictionaries=None): """ Specify a partitioning scheme. @@ -121,6 +122,11 @@ def partitioning(schema=None, field_names=None, flavor=None): flavor : str, default None The default is DirectoryPartitioning. Specify ``flavor="hive"`` for a HivePartitioning. + dictionaries : List[Array] + If the type of any field of `schema` is a dictionary type, the + corresponding entry of `dictionaries` must be an array containing + every value which may be taken by the corresponding column or an + error will be raised in parsing. Returns ------- @@ -158,7 +164,7 @@ def partitioning(schema=None, field_names=None, flavor=None): if field_names is not None: raise ValueError( "Cannot specify both 'schema' and 'field_names'") - return DirectoryPartitioning(schema) + return DirectoryPartitioning(schema, dictionaries) elif field_names is not None: if isinstance(field_names, list): return DirectoryPartitioning.discover(field_names) @@ -175,7 +181,7 @@ def partitioning(schema=None, field_names=None, flavor=None): raise ValueError("Cannot specify 'field_names' for flavor 'hive'") elif schema is not None: if isinstance(schema, pa.Schema): - return HivePartitioning(schema) + return HivePartitioning(schema, dictionaries) else: raise ValueError( "Expected Schema for 'schema', got {}".format( @@ -635,7 +641,8 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, - filesystem=None, file_options=None, use_threads=True): + filesystem=None, file_options=None, use_threads=True, + max_partitions=None): """ Write a dataset to a given format and partitioning. @@ -668,6 +675,8 @@ def write_dataset(data, base_dir, basename_template=None, format=None, use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. + max_partitions : int, default 1024 + Maximum number of partitions any batch may be written into. """ from pyarrow.fs import LocalFileSystem, _ensure_filesystem @@ -700,6 +709,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None, if basename_template is None: basename_template = "part-{i}." + format.default_extname + if max_partitions is None: + max_partitions = 1024 + partitioning = _ensure_write_partitioning(partitioning) if filesystem is None: @@ -711,4 +723,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None, _filesystemdataset_write( data, base_dir, basename_template, schema, filesystem, partitioning, file_options, use_threads, + max_partitions ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 73803c0ad36..362967d0fa0 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -213,6 +213,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFileSystem] filesystem c_string base_dir shared_ptr[CPartitioning] partitioning + int max_partitions c_string basename_template cdef cppclass CFileSystemDataset \ @@ -277,7 +278,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CDirectoryPartitioning \ "arrow::dataset::DirectoryPartitioning"(CPartitioning): - CDirectoryPartitioning(shared_ptr[CSchema] schema) + CDirectoryPartitioning(shared_ptr[CSchema] schema, + vector[shared_ptr[CArray]] dictionaries) @staticmethod shared_ptr[CPartitioningFactory] MakeFactory( @@ -285,7 +287,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CHivePartitioning \ "arrow::dataset::HivePartitioning"(CPartitioning): - CHivePartitioning(shared_ptr[CSchema] schema) + CHivePartitioning(shared_ptr[CSchema] schema, + vector[shared_ptr[CArray]] dictionaries) @staticmethod shared_ptr[CPartitioningFactory] MakeFactory( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 0ab9d95398d..c3ad83ba1ab 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2315,6 +2315,32 @@ def test_write_dataset_partitioned(tempdir): partitioning=partitioning_schema) +@pytest.mark.parquet +@pytest.mark.pandas +def test_write_dataset_partitioned_dict(tempdir): + directory = tempdir / "partitioned" + _ = _create_parquet_dataset_partitioned(directory) + + # directory partitioning, dictionary partition columns + dataset = ds.dataset( + directory, + partitioning=ds.HivePartitioning.discover(infer_dictionary=True)) + target = tempdir / 'partitioned-dir-target' + expected_paths = [ + target / "a", target / "a" / "part-0.feather", + target / "b", target / "b" / "part-1.feather" + ] + partitioning = ds.partitioning(pa.schema([ + dataset.schema.field('part')]), + dictionaries={'part': pa.array(['a', 'b'])}) + # NB: dictionaries required here since we use partitioning to parse + # directories in _check_dataset_roundtrip (not currently required for + # the formatting step) + _check_dataset_roundtrip( + dataset, str(target), expected_paths, target, + partitioning=partitioning) + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_use_threads(tempdir): @@ -2412,6 +2438,30 @@ def test_write_table_multiple_fragments(tempdir): ) +def test_write_table_partitioned_dict(tempdir): + # ensure writing table partitioned on a dictionary column works without + # specifying the dictionary values explicitly + table = pa.table([ + pa.array(range(20)), + pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(), + ], names=['col', 'part']) + + partitioning = ds.partitioning(table.select(["part"]).schema) + + base_dir = tempdir / "dataset" + ds.write_dataset( + table, base_dir, format="feather", partitioning=partitioning + ) + + # check roundtrip + partitioning_read = ds.DirectoryPartitioning.discover( + ["part"], infer_dictionary=True) + result = ds.dataset( + base_dir, format="ipc", partitioning=partitioning_read + ).to_table() + assert result.equals(table) + + @pytest.mark.parquet def test_write_dataset_parquet(tempdir): import pyarrow.parquet as pq diff --git a/testing b/testing index b4eeafdec6f..d6c4deb22c4 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit b4eeafdec6fb5284c4aaf269f2ebdb3be2c63ed5 +Subproject commit d6c4deb22c4b4e9e3247a2f291046e3c671ad235