diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index f6e7b9a0d28..5c390b6b487 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -37,6 +37,8 @@ #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" #include "arrow/util/string_view.h" +#include "arrow/util/uri.h" +#include "arrow/util/utf8.h" namespace arrow { @@ -46,6 +48,18 @@ using util::string_view; namespace dataset { +namespace { +/// Apply UriUnescape, then ensure the results are valid UTF-8. +Result SafeUriUnescape(util::string_view encoded) { + auto decoded = internal::UriUnescape(encoded); + if (!util::ValidateUTF8(decoded)) { + return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ", + encoded); + } + return decoded; +} +} // namespace + std::shared_ptr Partitioning::Default() { class DefaultPartitioning : public Partitioning { public: @@ -158,6 +172,21 @@ Result KeyValuePartitioning::Partition( return out; } +std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding) { + switch (segment_encoding) { + case SegmentEncoding::None: + os << "SegmentEncoding::None"; + break; + case SegmentEncoding::Uri: + os << "SegmentEncoding::Uri"; + break; + default: + os << "(invalid SegmentEncoding " << static_cast(segment_encoding) << ")"; + break; + } + return os; +} + Result KeyValuePartitioning::ConvertKey(const Key& key) const { ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(*schema_)); if (match.empty()) { @@ -209,7 +238,8 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) con Result KeyValuePartitioning::Parse(const std::string& path) const { std::vector expressions; - for (const Key& key : ParseKeys(path)) { + ARROW_ASSIGN_OR_RAISE(auto parsed, ParseKeys(path)); + for (const Key& key : parsed) { ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key)); if (expr == compute::literal(true)) continue; expressions.push_back(std::move(expr)); @@ -259,7 +289,14 @@ Result KeyValuePartitioning::Format(const compute::Expression& expr return FormatValues(values); } -std::vector DirectoryPartitioning::ParseKeys( +DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr schema, + ArrayVector dictionaries, + KeyValuePartitioningOptions options) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) { + util::InitializeUTF8(); +} + +Result> DirectoryPartitioning::ParseKeys( const std::string& path) const { std::vector keys; @@ -267,7 +304,23 @@ std::vector DirectoryPartitioning::ParseKeys( for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (i >= schema_->num_fields()) break; - keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + switch (options_.segment_encoding) { + case SegmentEncoding::None: { + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", segment); + } + keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + break; + } + case SegmentEncoding::Uri: { + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options_.segment_encoding); + } } return keys; @@ -308,6 +361,20 @@ Result DirectoryPartitioning::FormatValues( return fs::internal::JoinAbstractPath(std::move(segments)); } +KeyValuePartitioningOptions PartitioningFactoryOptions::AsPartitioningOptions() const { + KeyValuePartitioningOptions options; + options.segment_encoding = segment_encoding; + return options; +} + +HivePartitioningOptions HivePartitioningFactoryOptions::AsHivePartitioningOptions() + const { + HivePartitioningOptions options; + options.segment_encoding = segment_encoding; + options.null_fallback = null_fallback; + return options; +} + namespace { class KeyValuePartitioningFactory : public PartitioningFactory { protected: @@ -430,6 +497,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { PartitioningFactoryOptions options) : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) { Reset(); + util::InitializeUTF8(); } std::string type_name() const override { return "schema"; } @@ -441,7 +509,23 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (field_index == field_names_.size()) break; - RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + switch (options_.segment_encoding) { + case SegmentEncoding::None: { + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", segment); + } + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + break; + } + case SegmentEncoding::Uri: { + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options_.segment_encoding); + } } } @@ -458,7 +542,8 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { // drop fields which aren't in field_names_ auto out_schema = SchemaFromColumnNames(schema, field_names_); - return std::make_shared(std::move(out_schema), dictionaries_); + return std::make_shared(std::move(out_schema), dictionaries_, + options_.AsPartitioningOptions()); } private: @@ -481,28 +566,50 @@ std::shared_ptr DirectoryPartitioning::MakeFactory( new DirectoryPartitioningFactory(std::move(field_names), options)); } -util::optional HivePartitioning::ParseKey( - const std::string& segment, const std::string& null_fallback) { +Result> HivePartitioning::ParseKey( + const std::string& segment, const HivePartitioningOptions& options) { auto name_end = string_view(segment).find_first_of('='); // Not round-trippable if (name_end == string_view::npos) { return util::nullopt; } + // Static method, so we have no better place for it + util::InitializeUTF8(); + auto name = segment.substr(0, name_end); - auto value = segment.substr(name_end + 1); - if (value == null_fallback) { - return Key{name, util::nullopt}; + std::string value; + switch (options.segment_encoding) { + case SegmentEncoding::None: { + value = segment.substr(name_end + 1); + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", value); + } + break; + } + case SegmentEncoding::Uri: { + auto raw_value = util::string_view(segment).substr(name_end + 1); + ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options.segment_encoding); + } + + if (value == options.null_fallback) { + return Key{std::move(name), util::nullopt}; } - return Key{name, value}; + return Key{std::move(name), std::move(value)}; } -std::vector HivePartitioning::ParseKeys( +Result> HivePartitioning::ParseKeys( const std::string& path) const { std::vector keys; for (const auto& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = ParseKey(segment, null_fallback_)) { + ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_)); + if (auto key = maybe_key) { keys.push_back(std::move(*key)); } } @@ -521,7 +628,7 @@ Result HivePartitioning::FormatValues(const ScalarVector& values) c } else if (!values[i]->is_valid) { // If no key is available just provide a placeholder segment to maintain the // field_index <-> path nesting relation - segments[i] = name + "=" + null_fallback_; + segments[i] = name + "=" + hive_options_.null_fallback; } else { segments[i] = name + "=" + values[i]->ToString(); } @@ -533,15 +640,18 @@ Result HivePartitioning::FormatValues(const ScalarVector& values) c class HivePartitioningFactory : public KeyValuePartitioningFactory { public: explicit HivePartitioningFactory(HivePartitioningFactoryOptions options) - : KeyValuePartitioningFactory(options), null_fallback_(options.null_fallback) {} + : KeyValuePartitioningFactory(options), options_(std::move(options)) {} std::string type_name() const override { return "hive"; } Result> Inspect( const std::vector& paths) override { + auto options = options_.AsHivePartitioningOptions(); for (auto path : paths) { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = HivePartitioning::ParseKey(segment, null_fallback_)) { + ARROW_ASSIGN_OR_RAISE(auto maybe_key, + HivePartitioning::ParseKey(segment, options)); + if (auto key = maybe_key) { RETURN_NOT_OK(InsertRepr(key->name, key->value)); } } @@ -565,12 +675,12 @@ class HivePartitioningFactory : public KeyValuePartitioningFactory { auto out_schema = SchemaFromColumnNames(schema, field_names_); return std::make_shared(std::move(out_schema), dictionaries_, - null_fallback_); + options_.AsHivePartitioningOptions()); } } private: - const std::string null_fallback_; + const HivePartitioningFactoryOptions options_; std::vector field_names_; }; diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 36276e7a3b1..db3008f1d67 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -89,8 +90,26 @@ class ARROW_DS_EXPORT Partitioning { std::shared_ptr schema_; }; +/// \brief The encoding of partition segments. +enum class SegmentEncoding : int8_t { + /// No encoding. + None = 0, + /// Segment values are URL-encoded. + Uri = 1, +}; + +ARROW_DS_EXPORT +std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding); + +/// \brief Options for key-value based partitioning (hive/directory). +struct ARROW_DS_EXPORT KeyValuePartitioningOptions { + /// After splitting a path into components, decode the path components + /// before parsing according to this scheme. + SegmentEncoding segment_encoding = SegmentEncoding::Uri; +}; + /// \brief Options for inferring a partitioning. -struct PartitioningFactoryOptions { +struct ARROW_DS_EXPORT PartitioningFactoryOptions { /// When inferring a schema for partition fields, yield dictionary encoded types /// instead of plain. This can be more efficient when materializing virtual /// columns, and Expressions parsed by the finished Partitioning will include @@ -100,12 +119,19 @@ struct PartitioningFactoryOptions { /// will only check discovered fields against the schema and update internal /// state (such as dictionaries). std::shared_ptr schema; + /// After splitting a path into components, decode the path components + /// before parsing according to this scheme. + SegmentEncoding segment_encoding = SegmentEncoding::Uri; + + KeyValuePartitioningOptions AsPartitioningOptions() const; }; /// \brief Options for inferring a hive-style partitioning. -struct HivePartitioningFactoryOptions : PartitioningFactoryOptions { +struct ARROW_DS_EXPORT HivePartitioningFactoryOptions : PartitioningFactoryOptions { /// The hive partitioning scheme maps null to a hard coded fallback string. std::string null_fallback; + + HivePartitioningOptions AsHivePartitioningOptions() const; }; /// \brief PartitioningFactory provides creation of a partitioning when the @@ -147,14 +173,17 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result Format(const compute::Expression& expr) const override; protected: - KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries) - : Partitioning(std::move(schema)), dictionaries_(std::move(dictionaries)) { + KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries, + KeyValuePartitioningOptions options) + : Partitioning(std::move(schema)), + dictionaries_(std::move(dictionaries)), + options_(options) { if (dictionaries_.empty()) { dictionaries_.resize(schema_->num_fields()); } } - virtual std::vector ParseKeys(const std::string& path) const = 0; + virtual Result> ParseKeys(const std::string& path) const = 0; virtual Result FormatValues(const ScalarVector& values) const = 0; @@ -162,6 +191,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result ConvertKey(const Key& key) const; ArrayVector dictionaries_; + KeyValuePartitioningOptions options_; }; /// \brief DirectoryPartitioning parses one segment of a path for each field in its @@ -175,8 +205,8 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { /// If a field in schema is of dictionary type, the corresponding element of /// dictionaries must be contain the dictionary of values for that field. explicit DirectoryPartitioning(std::shared_ptr schema, - ArrayVector dictionaries = {}) - : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} + ArrayVector dictionaries = {}, + KeyValuePartitioningOptions options = {}); std::string type_name() const override { return "schema"; } @@ -188,7 +218,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { std::vector field_names, PartitioningFactoryOptions = {}); private: - std::vector ParseKeys(const std::string& path) const override; + Result> ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; @@ -196,6 +226,16 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { /// \brief The default fallback used for null values in a Hive-style partitioning. static constexpr char kDefaultHiveNullFallback[] = "__HIVE_DEFAULT_PARTITION__"; +struct ARROW_DS_EXPORT HivePartitioningOptions : public KeyValuePartitioningOptions { + std::string null_fallback = kDefaultHiveNullFallback; + + static HivePartitioningOptions DefaultsWithNullFallback(std::string fallback) { + HivePartitioningOptions options; + options.null_fallback = std::move(fallback); + return options; + } +}; + /// \brief Multi-level, directory based partitioning /// originating from Apache Hive with all data files stored in the /// leaf directories. Data is partitioned by static values of a @@ -211,22 +251,31 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { /// dictionaries must be contain the dictionary of values for that field. explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}, std::string null_fallback = kDefaultHiveNullFallback) - : KeyValuePartitioning(std::move(schema), std::move(dictionaries)), - null_fallback_(std::move(null_fallback)) {} + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), + KeyValuePartitioningOptions()), + hive_options_( + HivePartitioningOptions::DefaultsWithNullFallback(std::move(null_fallback))) { + } + + explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries, + HivePartitioningOptions options) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options), + hive_options_(options) {} std::string type_name() const override { return "hive"; } - std::string null_fallback() const { return null_fallback_; } + std::string null_fallback() const { return hive_options_.null_fallback; } + const HivePartitioningOptions& options() const { return hive_options_; } - static util::optional ParseKey(const std::string& segment, - const std::string& null_fallback); + static Result> ParseKey(const std::string& segment, + const HivePartitioningOptions& options); /// \brief Create a factory for a hive partitioning. static std::shared_ptr MakeFactory( HivePartitioningFactoryOptions = {}); private: - const std::string null_fallback_; - std::vector ParseKeys(const std::string& path) const override; + const HivePartitioningOptions hive_options_; + Result> ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 7a7ffcff229..d8e5198f21d 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -558,6 +558,103 @@ TEST_F(TestPartitioning, ExistingSchemaHive) { AssertInspect({"/a=0/b=1", "/b=2"}, options.schema->fields()); } +TEST_F(TestPartitioning, UrlEncodedDirectory) { + PartitioningFactoryOptions options; + auto ts = timestamp(TimeUnit::type::SECOND); + options.schema = schema({field("date", ts), field("time", ts), field("str", utf8())}); + factory_ = DirectoryPartitioning::MakeFactory(options.schema->field_names(), options); + + AssertInspect({"/2021-05-04 00:00:00/2021-05-04 07:27:00/%24", + "/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/foo"}, + options.schema->fields()); + auto date = std::make_shared(1620086400, ts); + auto time = std::make_shared(1620113220, ts); + partitioning_ = std::make_shared(options.schema, ArrayVector()); + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), + equal(field_ref("str"), literal("$"))})); + + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/%AF/%BF/%CF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/%AF/%BF/%CF"})); + + options.segment_encoding = SegmentEncoding::None; + options.schema = + schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); + factory_ = DirectoryPartitioning::MakeFactory(options.schema->field_names(), options); + AssertInspect({"/2021-05-04 00:00:00/2021-05-04 07:27:00/%E3%81%8F%E3%81%BE", + "/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/foo"}, + options.schema->fields()); + partitioning_ = std::make_shared( + options.schema, ArrayVector(), options.AsPartitioningOptions()); + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), + equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), + equal(field_ref("str"), literal("%24"))})); +} + +TEST_F(TestPartitioning, UrlEncodedHive) { + HivePartitioningFactoryOptions options; + auto ts = timestamp(TimeUnit::type::SECOND); + options.schema = schema({field("date", ts), field("time", ts), field("str", utf8())}); + options.null_fallback = "$"; + factory_ = HivePartitioning::MakeFactory(options); + + AssertInspect( + {"/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + "/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24"}, + options.schema->fields()); + + auto date = std::make_shared(1620086400, ts); + auto time = std::make_shared(1620113220, ts); + partitioning_ = std::make_shared(options.schema, ArrayVector(), + options.AsHivePartitioningOptions()); + AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); + AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), + equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); + // URL-encoded null fallback value + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); + + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/date=%AF/time=%BF/str=%CF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/date=%AF/time=%BF/str=%CF"})); + + options.segment_encoding = SegmentEncoding::None; + options.schema = + schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); + factory_ = HivePartitioning::MakeFactory(options); + AssertInspect( + {"/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + "/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24"}, + options.schema->fields()); + partitioning_ = std::make_shared(options.schema, ArrayVector(), + options.AsHivePartitioningOptions()); + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), + equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), + equal(field_ref("str"), literal("%24"))})); + + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/date=\xAF/time=\xBF/str=\xCF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"})); +} + TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; @@ -655,8 +752,9 @@ class RangePartitioning : public Partitioning { Result Parse(const std::string& path) const override { std::vector ranges; + HivePartitioningOptions options; for (auto segment : fs::internal::SplitAbstractPath(path)) { - auto key = HivePartitioning::ParseKey(segment, ""); + ARROW_ASSIGN_OR_RAISE(auto key, HivePartitioning::ParseKey(segment, options)); if (!key) { return Status::Invalid("can't parse '", segment, "' as a range"); } diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 67a999456be..019aaf4241b 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -71,8 +71,10 @@ class ParquetFileWriteOptions; class Partitioning; class PartitioningFactory; class PartitioningOrFactory; +struct KeyValuePartitioningOptions; class DirectoryPartitioning; class HivePartitioning; +struct HivePartitioningOptions; struct ScanOptions; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 356bf8ce9c7..78620b25942 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -85,6 +85,14 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): return c_source +cdef CSegmentEncoding _get_segment_encoding(str segment_encoding): + if segment_encoding == "none": + return CSegmentEncodingNone + elif segment_encoding == "uri": + return CSegmentEncodingUri + raise ValueError(f"Unknown segment encoding: {segment_encoding}") + + cdef class Expression(_Weakrefable): """ A logical expression to be evaluated against some input. @@ -1930,6 +1938,9 @@ cdef class DirectoryPartitioning(Partitioning): corresponding entry of `dictionaries` must be an array containing every value which may be taken by the corresponding column or an error will be raised in parsing. + segment_encoding : str, default "uri" + After splitting paths into segments, decode the segments. Valid + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -1947,13 +1958,17 @@ cdef class DirectoryPartitioning(Partitioning): cdef: CDirectoryPartitioning* directory_partitioning - def __init__(self, Schema schema not None, dictionaries=None): + def __init__(self, Schema schema not None, dictionaries=None, + segment_encoding="uri"): cdef: shared_ptr[CDirectoryPartitioning] c_partitioning + CKeyValuePartitioningOptions c_options + c_options.segment_encoding = _get_segment_encoding(segment_encoding) c_partitioning = make_shared[CDirectoryPartitioning]( pyarrow_unwrap_schema(schema), - _partitioning_dictionaries(schema, dictionaries) + _partitioning_dictionaries(schema, dictionaries), + c_options, ) self.init( c_partitioning) @@ -1964,7 +1979,7 @@ cdef class DirectoryPartitioning(Partitioning): @staticmethod def discover(field_names=None, infer_dictionary=False, max_partition_dictionary_size=0, - schema=None): + schema=None, segment_encoding="uri"): """ Discover a DirectoryPartitioning. @@ -1987,6 +2002,9 @@ cdef class DirectoryPartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. + segment_encoding : str, default "uri" + After splitting paths into segments, decode the segments. Valid + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -2015,6 +2033,9 @@ cdef class DirectoryPartitioning(Partitioning): "cannot infer field_names") else: c_field_names = [tobytes(s) for s in field_names] + + c_options.segment_encoding = _get_segment_encoding(segment_encoding) + return PartitioningFactory.wrap( CDirectoryPartitioning.MakeFactory(c_field_names, c_options)) @@ -2044,6 +2065,9 @@ cdef class HivePartitioning(Partitioning): error will be raised in parsing. null_fallback : str, default "__HIVE_DEFAULT_PARTITION__" If any field is None then this fallback will be used as a label + segment_encoding : str, default "uri" + After splitting paths into segments, decode the segments. Valid + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -2065,16 +2089,20 @@ cdef class HivePartitioning(Partitioning): def __init__(self, Schema schema not None, dictionaries=None, - null_fallback="__HIVE_DEFAULT_PARTITION__"): + null_fallback="__HIVE_DEFAULT_PARTITION__", + segment_encoding="uri"): cdef: shared_ptr[CHivePartitioning] c_partitioning - c_string c_null_fallback = tobytes(null_fallback) + CHivePartitioningOptions c_options + + c_options.null_fallback = tobytes(null_fallback) + c_options.segment_encoding = _get_segment_encoding(segment_encoding) c_partitioning = make_shared[CHivePartitioning]( pyarrow_unwrap_schema(schema), _partitioning_dictionaries(schema, dictionaries), - c_null_fallback + c_options, ) self.init( c_partitioning) @@ -2086,7 +2114,8 @@ cdef class HivePartitioning(Partitioning): def discover(infer_dictionary=False, max_partition_dictionary_size=0, null_fallback="__HIVE_DEFAULT_PARTITION__", - schema=None): + schema=None, + segment_encoding="uri"): """ Discover a HivePartitioning. @@ -2110,6 +2139,9 @@ cdef class HivePartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. + segment_encoding : str, default "uri" + After splitting paths into segments, decode the segments. Valid + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -2133,6 +2165,8 @@ cdef class HivePartitioning(Partitioning): if schema: c_options.schema = pyarrow_unwrap_schema(schema) + c_options.segment_encoding = _get_segment_encoding(segment_encoding) + return PartitioningFactory.wrap( CHivePartitioning.MakeFactory(c_options)) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 96c9648f920..8cab5536647 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -305,16 +305,35 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CExpression] Parse(const c_string & path) const const shared_ptr[CSchema] & schema() + cdef cppclass CSegmentEncoding" arrow::dataset::SegmentEncoding": + pass + + CSegmentEncoding CSegmentEncodingNone\ + " arrow::dataset::SegmentEncoding::None" + CSegmentEncoding CSegmentEncodingUri\ + " arrow::dataset::SegmentEncoding::Uri" + + cdef cppclass CKeyValuePartitioningOptions \ + "arrow::dataset::KeyValuePartitioningOptions": + CSegmentEncoding segment_encoding + + cdef cppclass CHivePartitioningOptions \ + "arrow::dataset::HivePartitioningOptions": + CSegmentEncoding segment_encoding + c_string null_fallback + cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": c_bool infer_dictionary shared_ptr[CSchema] schema + CSegmentEncoding segment_encoding cdef cppclass CHivePartitioningFactoryOptions \ "arrow::dataset::HivePartitioningFactoryOptions": - c_bool infer_dictionary, + c_bool infer_dictionary c_string null_fallback shared_ptr[CSchema] schema + CSegmentEncoding segment_encoding cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass @@ -331,7 +350,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CHivePartitioning \ "arrow::dataset::HivePartitioning"(CPartitioning): CHivePartitioning(shared_ptr[CSchema] schema, - vector[shared_ptr[CArray]] dictionaries) + vector[shared_ptr[CArray]] dictionaries, + CHivePartitioningOptions options) @staticmethod shared_ptr[CPartitioningFactory] MakeFactory( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 982bbe31f74..5e83657ebf2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1424,6 +1424,99 @@ def test_partitioning_factory_dictionary(mockfs, infer_dictionary): assert inferred_schema.field('key').type == pa.string() +def test_partitioning_factory_segment_encoding(): + mockfs = fs._MockFileSystem() + format = ds.IpcFileFormat() + schema = pa.schema([("i64", pa.int64())]) + table = pa.table([pa.array(range(10))], schema=schema) + partition_schema = pa.schema( + [("date", pa.timestamp("s")), ("string", pa.string())]) + string_partition_schema = pa.schema( + [("date", pa.string()), ("string", pa.string())]) + full_schema = pa.schema(list(schema) + list(partition_schema)) + for directory in [ + "directory/2021-05-04 00%3A00%3A00/%24", + "hive/date=2021-05-04 00%3A00%3A00/string=%24", + ]: + mockfs.create_dir(directory) + with mockfs.open_output_stream(directory + "/0.feather") as sink: + with pa.ipc.new_file(sink, schema) as writer: + writer.write_table(table) + writer.close() + + # Directory + selector = fs.FileSelector("directory", recursive=True) + options = ds.FileSystemFactoryOptions("directory") + options.partitioning_factory = ds.DirectoryPartitioning.discover( + schema=partition_schema) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + inferred_schema = factory.inspect() + assert inferred_schema == full_schema + actual = factory.finish().to_table(columns={ + "date_int": ds.field("date").cast(pa.int64()), + }) + assert actual[0][0].as_py() == 1620086400 + + options.partitioning_factory = ds.DirectoryPartitioning.discover( + ["date", "string"], segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + + options.partitioning = ds.DirectoryPartitioning( + string_partition_schema, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + + options.partitioning_factory = ds.DirectoryPartitioning.discover( + schema=partition_schema, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + with pytest.raises(pa.ArrowInvalid, + match="Could not cast segments for partition field"): + inferred_schema = factory.inspect() + + # Hive + selector = fs.FileSelector("hive", recursive=True) + options = ds.FileSystemFactoryOptions("hive") + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + inferred_schema = factory.inspect() + assert inferred_schema == full_schema + actual = factory.finish().to_table(columns={ + "date_int": ds.field("date").cast(pa.int64()), + }) + assert actual[0][0].as_py() == 1620086400 + + options.partitioning_factory = ds.HivePartitioning.discover( + segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + + options.partitioning = ds.HivePartitioning( + string_partition_schema, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + with pytest.raises(pa.ArrowInvalid, + match="Could not cast segments for partition field"): + inferred_schema = factory.inspect() + + def test_dictionary_partitioning_outer_nulls_raises(tempdir): table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) part = ds.partitioning( diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 55a28529f85..2c8163bb201 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -472,20 +472,20 @@ dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buff .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer) } -dataset___DirectoryPartitioning <- function(schm){ - .Call(`_arrow_dataset___DirectoryPartitioning`, schm) +dataset___DirectoryPartitioning <- function(schm, segment_encoding){ + .Call(`_arrow_dataset___DirectoryPartitioning`, schm, segment_encoding) } -dataset___DirectoryPartitioning__MakeFactory <- function(field_names){ - .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names) +dataset___DirectoryPartitioning__MakeFactory <- function(field_names, segment_encoding){ + .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names, segment_encoding) } -dataset___HivePartitioning <- function(schm, null_fallback){ - .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback) +dataset___HivePartitioning <- function(schm, null_fallback, segment_encoding){ + .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback, segment_encoding) } -dataset___HivePartitioning__MakeFactory <- function(null_fallback){ - .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback) +dataset___HivePartitioning__MakeFactory <- function(null_fallback, segment_encoding){ + .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback, segment_encoding) } dataset___ScannerBuilder__ProjectNames <- function(sb, cols){ diff --git a/r/R/dataset-partition.R b/r/R/dataset-partition.R index 3c4f18a5692..6e29e4ea31c 100644 --- a/r/R/dataset-partition.R +++ b/r/R/dataset-partition.R @@ -64,15 +64,19 @@ Partitioning <- R6Class("Partitioning", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioning <- R6Class("DirectoryPartitioning", inherit = Partitioning) -DirectoryPartitioning$create <- dataset___DirectoryPartitioning +DirectoryPartitioning$create <- function(schm, segment_encoding = "uri") { + dataset___DirectoryPartitioning(schm, segment_encoding = segment_encoding) +} #' @usage NULL #' @format NULL #' @rdname Partitioning #' @export HivePartitioning <- R6Class("HivePartitioning", inherit = Partitioning) -HivePartitioning$create <- function(schm, null_fallback = NULL) { - dataset___HivePartitioning(schm, null_fallback = null_fallback_or_default(null_fallback)) +HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "uri") { + dataset___HivePartitioning(schm, + null_fallback = null_fallback_or_default(null_fallback), + segment_encoding = segment_encoding) } #' Construct Hive partitioning @@ -86,17 +90,19 @@ HivePartitioning$create <- function(schm, null_fallback = NULL) { #' @param null_fallback character to be used in place of missing values (`NA` or `NULL`) #' in partition columns. Default is `"__HIVE_DEFAULT_PARTITION__"`, #' which is what Hive uses. +#' @param segment_encoding Decode partition segments after splitting paths. +#' Default is `"uri"` (URI-decode segments). May also be `"none"` (leave as-is). #' @return A [HivePartitioning][Partitioning], or a `HivePartitioningFactory` if #' calling `hive_partition()` with no arguments. #' @examplesIf arrow_with_dataset() #' hive_partition(year = int16(), month = int8()) #' @export -hive_partition <- function(..., null_fallback = NULL) { +hive_partition <- function(..., null_fallback = NULL, segment_encoding = "uri") { schm <- schema(...) if (length(schm) == 0) { - HivePartitioningFactory$create(null_fallback) + HivePartitioningFactory$create(null_fallback, segment_encoding) } else { - HivePartitioning$create(schm, null_fallback) + HivePartitioning$create(schm, null_fallback, segment_encoding) } } @@ -107,15 +113,17 @@ PartitioningFactory <- R6Class("PartitioningFactory", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioningFactory <- R6Class("DirectoryPartitioningFactory ", inherit = PartitioningFactory) -DirectoryPartitioningFactory$create <- dataset___DirectoryPartitioning__MakeFactory +DirectoryPartitioningFactory$create <- function(field_names, segment_encoding = "uri") { + dataset___DirectoryPartitioning__MakeFactory(field_names, segment_encoding) +} #' @usage NULL #' @format NULL #' @rdname Partitioning #' @export HivePartitioningFactory <- R6Class("HivePartitioningFactory", inherit = PartitioningFactory) -HivePartitioningFactory$create <- function(null_fallback = NULL) { - dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback)) +HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "uri") { + dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), segment_encoding) } null_fallback_or_default <- function(null_fallback) { diff --git a/r/man/hive_partition.Rd b/r/man/hive_partition.Rd index aeb9cd4b3d1..eef9f9157ea 100644 --- a/r/man/hive_partition.Rd +++ b/r/man/hive_partition.Rd @@ -4,7 +4,7 @@ \alias{hive_partition} \title{Construct Hive partitioning} \usage{ -hive_partition(..., null_fallback = NULL) +hive_partition(..., null_fallback = NULL, segment_encoding = "uri") } \arguments{ \item{...}{named list of \link[=data-type]{data types}, passed to \code{\link[=schema]{schema()}}} @@ -12,6 +12,9 @@ hive_partition(..., null_fallback = NULL) \item{null_fallback}{character to be used in place of missing values (\code{NA} or \code{NULL}) in partition columns. Default is \code{"__HIVE_DEFAULT_PARTITION__"}, which is what Hive uses.} + +\item{segment_encoding}{Decode partition segments after splitting paths. +Default is \code{"uri"} (URI-decode segments). May also be \code{"none"} (leave as-is).} } \value{ A \link[=Partitioning]{HivePartitioning}, or a \code{HivePartitioningFactory} if diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b7ca5e9414c..2f0b036cd80 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1856,61 +1856,65 @@ extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffe // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){ +std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning(schm)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning(schm, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp){ +std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type field_names(field_names_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback); -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp){ +std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); arrow::r::Input::type null_fallback(null_fallback_sexp); - return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___HivePartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback); -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp){ +std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input::type null_fallback(null_fallback_sexp); - return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___HivePartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7005,10 +7009,10 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3}, - { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, - { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, - { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, - { "_arrow_dataset___HivePartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___HivePartitioning__MakeFactory, 1}, + { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 2}, + { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 2}, + { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 3}, + { "_arrow_dataset___HivePartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___HivePartitioning__MakeFactory, 2}, { "_arrow_dataset___ScannerBuilder__ProjectNames", (DL_FUNC) &_arrow_dataset___ScannerBuilder__ProjectNames, 2}, { "_arrow_dataset___ScannerBuilder__ProjectExprs", (DL_FUNC) &_arrow_dataset___ScannerBuilder__ProjectExprs, 3}, { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index c5ecc84dbaa..24c1a1343ea 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -333,30 +333,50 @@ dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buf // DirectoryPartitioning, HivePartitioning +ds::SegmentEncoding GetSegmentEncoding(const std::string& segment_encoding) { + if (segment_encoding == "none") { + return ds::SegmentEncoding::None; + } else if (segment_encoding == "uri") { + return ds::SegmentEncoding::Uri; + } + cpp11::stop("invalid segment encoding: " + segment_encoding); + return ds::SegmentEncoding::None; +} + // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning( - const std::shared_ptr& schm) { - return std::make_shared(schm); + const std::shared_ptr& schm, const std::string& segment_encoding) { + ds::KeyValuePartitioningOptions options; + options.segment_encoding = GetSegmentEncoding(segment_encoding); + std::vector> dictionaries; + return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning__MakeFactory( - const std::vector& field_names) { - return ds::DirectoryPartitioning::MakeFactory(field_names); + const std::vector& field_names, const std::string& segment_encoding) { + ds::PartitioningFactoryOptions options; + options.segment_encoding = GetSegmentEncoding(segment_encoding); + return ds::DirectoryPartitioning::MakeFactory(field_names, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning( - const std::shared_ptr& schm, const std::string& null_fallback) { + const std::shared_ptr& schm, const std::string& null_fallback, + const std::string& segment_encoding) { + ds::HivePartitioningOptions options; + options.null_fallback = null_fallback; + options.segment_encoding = GetSegmentEncoding(segment_encoding); std::vector> dictionaries; - return std::make_shared(schm, dictionaries, null_fallback); + return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning__MakeFactory( - const std::string& null_fallback) { + const std::string& null_fallback, const std::string& segment_encoding) { ds::HivePartitioningFactoryOptions options; options.null_fallback = null_fallback; + options.segment_encoding = GetSegmentEncoding(segment_encoding); return ds::HivePartitioning::MakeFactory(options); } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index be141c74659..e7b9322fd50 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1108,6 +1108,111 @@ test_that("Assembling a Dataset manually and getting a Table", { expect_scan_result(ds, schm) }) +test_that("URI-decoding with directory partitioning", { + root <- make_temp_dir() + fmt <- FileFormat$create("feather") + fs <- LocalFileSystem$create() + selector <- FileSelector$create(root, recursive = TRUE) + dir1 <- file.path(root, "2021-05-04 00%3A00%3A00", "%24") + dir.create(dir1, recursive = TRUE) + write_feather(df1, file.path(dir1, "data.feather")) + + partitioning <- DirectoryPartitioning$create( + schema(date = timestamp(unit = "s"), string = utf8())) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_scan_result(ds, schm) + + partitioning <- DirectoryPartitioning$create( + schema(date = timestamp(unit = "s"), string = utf8()), + segment_encoding = "none") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + schm <- factory$Inspect() + expect_error(factory$Finish(schm), "Invalid: error parsing") + + partitioning_factory <- DirectoryPartitioningFactory$create( + c("date", "string")) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + # Can't directly inspect partition expressions, so do it implicitly via scan + expect_equal( + ds %>% + filter(date == "2021-05-04 00:00:00", string == "$") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) + + partitioning_factory <- DirectoryPartitioningFactory$create( + c("date", "string"), segment_encoding = "none") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) +}) + +test_that("URI-decoding with hive partitioning", { + root <- make_temp_dir() + fmt <- FileFormat$create("feather") + fs <- LocalFileSystem$create() + selector <- FileSelector$create(root, recursive = TRUE) + dir1 <- file.path(root, "date=2021-05-04 00%3A00%3A00", "string=%24") + dir.create(dir1, recursive = TRUE) + write_feather(df1, file.path(dir1, "data.feather")) + + partitioning <- hive_partition( + date = timestamp(unit = "s"), string = utf8()) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + ds <- factory$Finish(schm) + expect_scan_result(ds, schm) + + partitioning <- hive_partition( + date = timestamp(unit = "s"), string = utf8(), segment_encoding = "none") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + expect_error(factory$Finish(schm), "Invalid: error parsing") + + partitioning_factory <- hive_partition() + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + # Can't directly inspect partition expressions, so do it implicitly via scan + expect_equal( + ds %>% + filter(date == "2021-05-04 00:00:00", string == "$") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) + + partitioning_factory <- hive_partition(segment_encoding = "none") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) +}) + test_that("Assembling multiple DatasetFactories with DatasetFactory", { skip_if_not_available("parquet") factory1 <- dataset_factory(file.path(dataset_dir, 1), format = "parquet")