From 4280ef57ab99408297c3704d6a86b2950c7c82cb Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 6 May 2021 16:46:51 -0400 Subject: [PATCH 01/10] ARROW-12644: [C++][Dataset] URL-decode partition segments --- cpp/src/arrow/dataset/partition.cc | 61 ++++++++++++++----- cpp/src/arrow/dataset/partition.h | 62 +++++++++++++++---- cpp/src/arrow/dataset/partition_test.cc | 81 ++++++++++++++++++++++++- cpp/src/arrow/dataset/type_fwd.h | 2 + 4 files changed, 179 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index f6e7b9a0d28..7d020d183f2 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -37,6 +37,7 @@ #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" #include "arrow/util/string_view.h" +#include "arrow/util/uri.h" namespace arrow { @@ -267,7 +268,11 @@ std::vector DirectoryPartitioning::ParseKeys( for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (i >= schema_->num_fields()) break; - keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + if (options_.url_decode_segments) { + keys.push_back({schema_->field(i++)->name(), internal::UriUnescape(segment)}); + } else { + keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + } } return keys; @@ -308,6 +313,20 @@ Result DirectoryPartitioning::FormatValues( return fs::internal::JoinAbstractPath(std::move(segments)); } +KeyValuePartitioningOptions PartitioningFactoryOptions::AsPartitioningOptions() const { + KeyValuePartitioningOptions options; + options.url_decode_segments = url_decode_segments; + return options; +} + +HivePartitioningOptions HivePartitioningFactoryOptions::AsHivePartitioningOptions() + const { + HivePartitioningOptions options; + options.url_decode_segments = url_decode_segments; + options.null_fallback = null_fallback; + return options; +} + namespace { class KeyValuePartitioningFactory : public PartitioningFactory { protected: @@ -441,7 +460,12 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (field_index == field_names_.size()) break; - RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + if (options_.url_decode_segments) { + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), + internal::UriUnescape(segment))); + } else { + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + } } } @@ -458,7 +482,8 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { // drop fields which aren't in field_names_ auto out_schema = SchemaFromColumnNames(schema, field_names_); - return std::make_shared(std::move(out_schema), dictionaries_); + return std::make_shared(std::move(out_schema), dictionaries_, + options_.AsPartitioningOptions()); } private: @@ -482,7 +507,7 @@ std::shared_ptr DirectoryPartitioning::MakeFactory( } util::optional HivePartitioning::ParseKey( - const std::string& segment, const std::string& null_fallback) { + const std::string& segment, const HivePartitioningOptions& options) { auto name_end = string_view(segment).find_first_of('='); // Not round-trippable if (name_end == string_view::npos) { @@ -490,11 +515,18 @@ util::optional HivePartitioning::ParseKey( } auto name = segment.substr(0, name_end); - auto value = segment.substr(name_end + 1); - if (value == null_fallback) { - return Key{name, util::nullopt}; + std::string value; + if (options.url_decode_segments) { + value = internal::UriUnescape( + util::string_view(segment.data() + name_end + 1, segment.size() - name_end - 1)); + } else { + value = segment.substr(name_end + 1); + } + + if (value == options.null_fallback) { + return Key{std::move(name), util::nullopt}; } - return Key{name, value}; + return Key{std::move(name), std::move(value)}; } std::vector HivePartitioning::ParseKeys( @@ -502,7 +534,7 @@ std::vector HivePartitioning::ParseKeys( std::vector keys; for (const auto& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = ParseKey(segment, null_fallback_)) { + if (auto key = ParseKey(segment, hive_options_)) { keys.push_back(std::move(*key)); } } @@ -521,7 +553,7 @@ Result HivePartitioning::FormatValues(const ScalarVector& values) c } else if (!values[i]->is_valid) { // If no key is available just provide a placeholder segment to maintain the // field_index <-> path nesting relation - segments[i] = name + "=" + null_fallback_; + segments[i] = name + "=" + hive_options_.null_fallback; } else { segments[i] = name + "=" + values[i]->ToString(); } @@ -533,15 +565,16 @@ Result HivePartitioning::FormatValues(const ScalarVector& values) c class HivePartitioningFactory : public KeyValuePartitioningFactory { public: explicit HivePartitioningFactory(HivePartitioningFactoryOptions options) - : KeyValuePartitioningFactory(options), null_fallback_(options.null_fallback) {} + : KeyValuePartitioningFactory(options), options_(std::move(options)) {} std::string type_name() const override { return "hive"; } Result> Inspect( const std::vector& paths) override { + auto options = options_.AsHivePartitioningOptions(); for (auto path : paths) { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = HivePartitioning::ParseKey(segment, null_fallback_)) { + if (auto key = HivePartitioning::ParseKey(segment, options)) { RETURN_NOT_OK(InsertRepr(key->name, key->value)); } } @@ -565,12 +598,12 @@ class HivePartitioningFactory : public KeyValuePartitioningFactory { auto out_schema = SchemaFromColumnNames(schema, field_names_); return std::make_shared(std::move(out_schema), dictionaries_, - null_fallback_); + options_.AsHivePartitioningOptions()); } } private: - const std::string null_fallback_; + const HivePartitioningFactoryOptions options_; std::vector field_names_; }; diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 36276e7a3b1..6fae96e60b7 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -89,8 +89,15 @@ class ARROW_DS_EXPORT Partitioning { std::shared_ptr schema_; }; +/// \brief Options for key-value based partitioning (hive/directory). +struct ARROW_DS_EXPORT KeyValuePartitioningOptions { + /// After splitting a path into components, URL-decode the path components + /// before parsing. + bool url_decode_segments = true; +}; + /// \brief Options for inferring a partitioning. -struct PartitioningFactoryOptions { +struct ARROW_DS_EXPORT PartitioningFactoryOptions { /// When inferring a schema for partition fields, yield dictionary encoded types /// instead of plain. This can be more efficient when materializing virtual /// columns, and Expressions parsed by the finished Partitioning will include @@ -100,12 +107,19 @@ struct PartitioningFactoryOptions { /// will only check discovered fields against the schema and update internal /// state (such as dictionaries). std::shared_ptr schema; + /// After splitting a path into components, URL-decode the path components + /// before parsing. + bool url_decode_segments = true; + + KeyValuePartitioningOptions AsPartitioningOptions() const; }; /// \brief Options for inferring a hive-style partitioning. -struct HivePartitioningFactoryOptions : PartitioningFactoryOptions { +struct ARROW_DS_EXPORT HivePartitioningFactoryOptions : PartitioningFactoryOptions { /// The hive partitioning scheme maps null to a hard coded fallback string. std::string null_fallback; + + HivePartitioningOptions AsHivePartitioningOptions() const; }; /// \brief PartitioningFactory provides creation of a partitioning when the @@ -147,8 +161,11 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result Format(const compute::Expression& expr) const override; protected: - KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries) - : Partitioning(std::move(schema)), dictionaries_(std::move(dictionaries)) { + KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries, + KeyValuePartitioningOptions options) + : Partitioning(std::move(schema)), + dictionaries_(std::move(dictionaries)), + options_(options) { if (dictionaries_.empty()) { dictionaries_.resize(schema_->num_fields()); } @@ -162,6 +179,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result ConvertKey(const Key& key) const; ArrayVector dictionaries_; + KeyValuePartitioningOptions options_; }; /// \brief DirectoryPartitioning parses one segment of a path for each field in its @@ -174,9 +192,10 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { public: /// If a field in schema is of dictionary type, the corresponding element of /// dictionaries must be contain the dictionary of values for that field. - explicit DirectoryPartitioning(std::shared_ptr schema, - ArrayVector dictionaries = {}) - : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} + explicit DirectoryPartitioning( + std::shared_ptr schema, ArrayVector dictionaries = {}, + KeyValuePartitioningOptions options = KeyValuePartitioningOptions()) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) {} std::string type_name() const override { return "schema"; } @@ -196,6 +215,16 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { /// \brief The default fallback used for null values in a Hive-style partitioning. static constexpr char kDefaultHiveNullFallback[] = "__HIVE_DEFAULT_PARTITION__"; +struct ARROW_DS_EXPORT HivePartitioningOptions : public KeyValuePartitioningOptions { + std::string null_fallback = kDefaultHiveNullFallback; + + static HivePartitioningOptions DefaultsWithNullFallback(std::string fallback) { + HivePartitioningOptions options; + options.null_fallback = std::move(fallback); + return options; + } +}; + /// \brief Multi-level, directory based partitioning /// originating from Apache Hive with all data files stored in the /// leaf directories. Data is partitioned by static values of a @@ -211,21 +240,30 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { /// dictionaries must be contain the dictionary of values for that field. explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}, std::string null_fallback = kDefaultHiveNullFallback) - : KeyValuePartitioning(std::move(schema), std::move(dictionaries)), - null_fallback_(std::move(null_fallback)) {} + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), + KeyValuePartitioningOptions()), + hive_options_( + HivePartitioningOptions::DefaultsWithNullFallback(std::move(null_fallback))) { + } + + explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries, + HivePartitioningOptions options) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options), + hive_options_(options) {} std::string type_name() const override { return "hive"; } - std::string null_fallback() const { return null_fallback_; } + std::string null_fallback() const { return hive_options_.null_fallback; } + const HivePartitioningOptions& options() const { return hive_options_; } static util::optional ParseKey(const std::string& segment, - const std::string& null_fallback); + const HivePartitioningOptions& options); /// \brief Create a factory for a hive partitioning. static std::shared_ptr MakeFactory( HivePartitioningFactoryOptions = {}); private: - const std::string null_fallback_; + const HivePartitioningOptions hive_options_; std::vector ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 7a7ffcff229..23b8593322c 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -558,6 +558,84 @@ TEST_F(TestPartitioning, ExistingSchemaHive) { AssertInspect({"/a=0/b=1", "/b=2"}, options.schema->fields()); } +TEST_F(TestPartitioning, UrlEncodedDirectory) { + PartitioningFactoryOptions options; + auto ts = timestamp(TimeUnit::type::SECOND); + options.schema = schema({field("date", ts), field("time", ts), field("str", utf8())}); + factory_ = DirectoryPartitioning::MakeFactory(options.schema->field_names(), options); + + AssertInspect({"/2021-05-04 00:00:00/2021-05-04 07:27:00/%24", + "/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/foo"}, + options.schema->fields()); + auto date = std::make_shared(1620086400, ts); + auto time = std::make_shared(1620113220, ts); + partitioning_ = std::make_shared(options.schema, ArrayVector()); + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), + equal(field_ref("str"), literal("$"))})); + + options.url_decode_segments = false; + options.schema = + schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); + factory_ = DirectoryPartitioning::MakeFactory(options.schema->field_names(), options); + AssertInspect({"/2021-05-04 00:00:00/2021-05-04 07:27:00/%E3%81%8F%E3%81%BE", + "/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/foo"}, + options.schema->fields()); + partitioning_ = std::make_shared( + options.schema, ArrayVector(), options.AsPartitioningOptions()); + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), + equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), + equal(field_ref("str"), literal("%24"))})); +} + +TEST_F(TestPartitioning, UrlEncodedHive) { + HivePartitioningFactoryOptions options; + auto ts = timestamp(TimeUnit::type::SECOND); + options.schema = schema({field("date", ts), field("time", ts), field("str", utf8())}); + options.null_fallback = "$"; + factory_ = HivePartitioning::MakeFactory(options); + + AssertInspect( + {"/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + "/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24"}, + options.schema->fields()); + + auto date = std::make_shared(1620086400, ts); + auto time = std::make_shared(1620113220, ts); + partitioning_ = std::make_shared(options.schema, ArrayVector(), + options.AsHivePartitioningOptions()); + AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); + AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), + equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); + // URL-encoded null fallback value + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); + + options.url_decode_segments = false; + options.schema = + schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); + factory_ = HivePartitioning::MakeFactory(options); + AssertInspect( + {"/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", + "/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24"}, + options.schema->fields()); + partitioning_ = std::make_shared(options.schema, ArrayVector(), + options.AsHivePartitioningOptions()); + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), + equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), + equal(field_ref("str"), literal("%24"))})); +} + TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; @@ -655,8 +733,9 @@ class RangePartitioning : public Partitioning { Result Parse(const std::string& path) const override { std::vector ranges; + HivePartitioningOptions options; for (auto segment : fs::internal::SplitAbstractPath(path)) { - auto key = HivePartitioning::ParseKey(segment, ""); + auto key = HivePartitioning::ParseKey(segment, options); if (!key) { return Status::Invalid("can't parse '", segment, "' as a range"); } diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 67a999456be..019aaf4241b 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -71,8 +71,10 @@ class ParquetFileWriteOptions; class Partitioning; class PartitioningFactory; class PartitioningOrFactory; +struct KeyValuePartitioningOptions; class DirectoryPartitioning; class HivePartitioning; +struct HivePartitioningOptions; struct ScanOptions; From 3ed3c5782925f0b1e48000bb242e0301f4743fdb Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 6 May 2021 17:31:22 -0400 Subject: [PATCH 02/10] ARROW-12644: [Python][Dataset] URL-decode partition segments --- python/pyarrow/_dataset.pyx | 36 ++++++-- python/pyarrow/includes/libarrow_dataset.pxd | 16 +++- python/pyarrow/tests/test_dataset.py | 97 ++++++++++++++++++++ 3 files changed, 140 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 356bf8ce9c7..ced27ad009b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1930,6 +1930,8 @@ cdef class DirectoryPartitioning(Partitioning): corresponding entry of `dictionaries` must be an array containing every value which may be taken by the corresponding column or an error will be raised in parsing. + url_decode_segments : bool, default True + After splitting paths into segments, URL-decode the segments. Returns ------- @@ -1947,13 +1949,17 @@ cdef class DirectoryPartitioning(Partitioning): cdef: CDirectoryPartitioning* directory_partitioning - def __init__(self, Schema schema not None, dictionaries=None): + def __init__(self, Schema schema not None, dictionaries=None, + bint url_decode_segments=True): cdef: shared_ptr[CDirectoryPartitioning] c_partitioning + CKeyValuePartitioningOptions c_options + c_options.url_decode_segments = url_decode_segments c_partitioning = make_shared[CDirectoryPartitioning]( pyarrow_unwrap_schema(schema), - _partitioning_dictionaries(schema, dictionaries) + _partitioning_dictionaries(schema, dictionaries), + c_options, ) self.init( c_partitioning) @@ -1964,7 +1970,7 @@ cdef class DirectoryPartitioning(Partitioning): @staticmethod def discover(field_names=None, infer_dictionary=False, max_partition_dictionary_size=0, - schema=None): + schema=None, bint url_decode_segments=True): """ Discover a DirectoryPartitioning. @@ -1987,6 +1993,8 @@ cdef class DirectoryPartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. + url_decode_segments : bool, default True + After splitting paths into segments, URL-decode the segments. Returns ------- @@ -2015,6 +2023,9 @@ cdef class DirectoryPartitioning(Partitioning): "cannot infer field_names") else: c_field_names = [tobytes(s) for s in field_names] + + c_options.url_decode_segments = url_decode_segments + return PartitioningFactory.wrap( CDirectoryPartitioning.MakeFactory(c_field_names, c_options)) @@ -2044,6 +2055,8 @@ cdef class HivePartitioning(Partitioning): error will be raised in parsing. null_fallback : str, default "__HIVE_DEFAULT_PARTITION__" If any field is None then this fallback will be used as a label + url_decode_segments : bool, default True + After splitting paths into segments, URL-decode the segments. Returns ------- @@ -2065,16 +2078,20 @@ cdef class HivePartitioning(Partitioning): def __init__(self, Schema schema not None, dictionaries=None, - null_fallback="__HIVE_DEFAULT_PARTITION__"): + null_fallback="__HIVE_DEFAULT_PARTITION__", + bint url_decode_segments=True): cdef: shared_ptr[CHivePartitioning] c_partitioning - c_string c_null_fallback = tobytes(null_fallback) + CHivePartitioningOptions c_options + + c_options.null_fallback = tobytes(null_fallback) + c_options.url_decode_segments = url_decode_segments c_partitioning = make_shared[CHivePartitioning]( pyarrow_unwrap_schema(schema), _partitioning_dictionaries(schema, dictionaries), - c_null_fallback + c_options, ) self.init( c_partitioning) @@ -2086,7 +2103,8 @@ cdef class HivePartitioning(Partitioning): def discover(infer_dictionary=False, max_partition_dictionary_size=0, null_fallback="__HIVE_DEFAULT_PARTITION__", - schema=None): + schema=None, + bint url_decode_segments=True): """ Discover a HivePartitioning. @@ -2110,6 +2128,8 @@ cdef class HivePartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. + url_decode_segments : bool, default True + After splitting paths into segments, URL-decode the segments. Returns ------- @@ -2133,6 +2153,8 @@ cdef class HivePartitioning(Partitioning): if schema: c_options.schema = pyarrow_unwrap_schema(schema) + c_options.url_decode_segments = url_decode_segments + return PartitioningFactory.wrap( CHivePartitioning.MakeFactory(c_options)) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 96c9648f920..7f615247897 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -305,16 +305,27 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CExpression] Parse(const c_string & path) const const shared_ptr[CSchema] & schema() + cdef cppclass CKeyValuePartitioningOptions \ + "arrow::dataset::KeyValuePartitioningOptions": + c_bool url_decode_segments + + cdef cppclass CHivePartitioningOptions \ + "arrow::dataset::HivePartitioningOptions": + c_bool url_decode_segments + c_string null_fallback + cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": c_bool infer_dictionary shared_ptr[CSchema] schema + c_bool url_decode_segments cdef cppclass CHivePartitioningFactoryOptions \ "arrow::dataset::HivePartitioningFactoryOptions": - c_bool infer_dictionary, + c_bool infer_dictionary c_string null_fallback shared_ptr[CSchema] schema + c_bool url_decode_segments cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass @@ -331,7 +342,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CHivePartitioning \ "arrow::dataset::HivePartitioning"(CPartitioning): CHivePartitioning(shared_ptr[CSchema] schema, - vector[shared_ptr[CArray]] dictionaries) + vector[shared_ptr[CArray]] dictionaries, + CHivePartitioningOptions options) @staticmethod shared_ptr[CPartitioningFactory] MakeFactory( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 982bbe31f74..315371eedb4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1424,6 +1424,103 @@ def test_partitioning_factory_dictionary(mockfs, infer_dictionary): assert inferred_schema.field('key').type == pa.string() +def test_partitioning_factory_url_decode(): + mockfs = fs._MockFileSystem() + format = ds.IpcFileFormat() + schema = pa.schema([("i64", pa.int64())]) + partition_schema = pa.schema( + [("date", pa.timestamp("s")), ("string", pa.string())]) + string_partition_schema = pa.schema( + [("date", pa.string()), ("string", pa.string())]) + full_schema = pa.schema(list(schema) + list(partition_schema)) + for directory in [ + "directory/2021-05-04 00%3A00%3A00/%24", + "directory/2021-05-05 00:00:00/$", + "hive/date=2021-05-04 00%3A00%3A00/string=%24", + "hive/date=2021-05-05 00:00:00/string=$", + ]: + mockfs.create_dir(directory) + with mockfs.open_output_stream(directory + "/0.feather") as sink: + with pa.ipc.new_file(sink, schema) as writer: + writer.close() + + # Directory + selector = fs.FileSelector("directory", recursive=True) + options = ds.FileSystemFactoryOptions("directory") + options.partitioning_factory = ds.DirectoryPartitioning.discover( + schema=partition_schema) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + inferred_schema = factory.inspect() + assert inferred_schema == full_schema + + options.partitioning_factory = ds.DirectoryPartitioning.discover( + ["date", "string"], url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + assert fragments[1].partition_expression.equals( + (ds.field("date") == "2021-05-05 00:00:00") & + (ds.field("string") == "$")) + + options.partitioning = ds.DirectoryPartitioning( + string_partition_schema, url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + assert fragments[1].partition_expression.equals( + (ds.field("date") == "2021-05-05 00:00:00") & + (ds.field("string") == "$")) + + options.partitioning_factory = ds.DirectoryPartitioning.discover( + schema=partition_schema, url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + with pytest.raises(pa.ArrowInvalid, + match="Could not cast segments for partition field"): + inferred_schema = factory.inspect() + + # Hive + selector = fs.FileSelector("hive", recursive=True) + options = ds.FileSystemFactoryOptions("hive") + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + inferred_schema = factory.inspect() + assert inferred_schema == full_schema + + options.partitioning_factory = ds.HivePartitioning.discover( + url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + assert fragments[1].partition_expression.equals( + (ds.field("date") == "2021-05-05 00:00:00") & + (ds.field("string") == "$")) + + options.partitioning = ds.HivePartitioning( + string_partition_schema, url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("date") == "2021-05-04 00%3A00%3A00") & + (ds.field("string") == "%24")) + assert fragments[1].partition_expression.equals( + (ds.field("date") == "2021-05-05 00:00:00") & + (ds.field("string") == "$")) + + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema, url_decode_segments=False) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + with pytest.raises(pa.ArrowInvalid, + match="Could not cast segments for partition field"): + inferred_schema = factory.inspect() + + def test_dictionary_partitioning_outer_nulls_raises(tempdir): table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) part = ds.partitioning( From b9f9fbfa56190e57d838e2079f00df66dcb20d46 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 7 May 2021 08:42:43 -0400 Subject: [PATCH 03/10] ARROW-12644: [R][Dataset] URL-decode partition segments --- r/R/arrowExports.R | 16 ++--- r/R/dataset-partition.R | 26 +++++--- r/man/hive_partition.Rd | 5 +- r/src/arrowExports.cpp | 44 +++++++------ r/src/dataset.cpp | 24 +++++-- r/tests/testthat/test-dataset.R | 111 ++++++++++++++++++++++++++++++++ 6 files changed, 181 insertions(+), 45 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 55a28529f85..a197fc98262 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -472,20 +472,20 @@ dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buff .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer) } -dataset___DirectoryPartitioning <- function(schm){ - .Call(`_arrow_dataset___DirectoryPartitioning`, schm) +dataset___DirectoryPartitioning <- function(schm, url_decode_segments){ + .Call(`_arrow_dataset___DirectoryPartitioning`, schm, url_decode_segments) } -dataset___DirectoryPartitioning__MakeFactory <- function(field_names){ - .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names) +dataset___DirectoryPartitioning__MakeFactory <- function(field_names, url_decode_segments){ + .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names, url_decode_segments) } -dataset___HivePartitioning <- function(schm, null_fallback){ - .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback) +dataset___HivePartitioning <- function(schm, null_fallback, url_decode_segments){ + .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback, url_decode_segments) } -dataset___HivePartitioning__MakeFactory <- function(null_fallback){ - .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback) +dataset___HivePartitioning__MakeFactory <- function(null_fallback, url_decode_segments){ + .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback, url_decode_segments) } dataset___ScannerBuilder__ProjectNames <- function(sb, cols){ diff --git a/r/R/dataset-partition.R b/r/R/dataset-partition.R index 3c4f18a5692..e224aaf8d74 100644 --- a/r/R/dataset-partition.R +++ b/r/R/dataset-partition.R @@ -64,15 +64,19 @@ Partitioning <- R6Class("Partitioning", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioning <- R6Class("DirectoryPartitioning", inherit = Partitioning) -DirectoryPartitioning$create <- dataset___DirectoryPartitioning +DirectoryPartitioning$create <- function(schm, url_decode_segments = TRUE) { + dataset___DirectoryPartitioning(schm, url_decode_segments = url_decode_segments) +} #' @usage NULL #' @format NULL #' @rdname Partitioning #' @export HivePartitioning <- R6Class("HivePartitioning", inherit = Partitioning) -HivePartitioning$create <- function(schm, null_fallback = NULL) { - dataset___HivePartitioning(schm, null_fallback = null_fallback_or_default(null_fallback)) +HivePartitioning$create <- function(schm, null_fallback = NULL, url_decode_segments = TRUE) { + dataset___HivePartitioning(schm, + null_fallback = null_fallback_or_default(null_fallback), + url_decode_segments = url_decode_segments) } #' Construct Hive partitioning @@ -86,17 +90,19 @@ HivePartitioning$create <- function(schm, null_fallback = NULL) { #' @param null_fallback character to be used in place of missing values (`NA` or `NULL`) #' in partition columns. Default is `"__HIVE_DEFAULT_PARTITION__"`, #' which is what Hive uses. +#' @param url_decode_segments URL-decode partition segments after splitting paths. +#' Default is `TRUE`. #' @return A [HivePartitioning][Partitioning], or a `HivePartitioningFactory` if #' calling `hive_partition()` with no arguments. #' @examplesIf arrow_with_dataset() #' hive_partition(year = int16(), month = int8()) #' @export -hive_partition <- function(..., null_fallback = NULL) { +hive_partition <- function(..., null_fallback = NULL, url_decode_segments = TRUE) { schm <- schema(...) if (length(schm) == 0) { - HivePartitioningFactory$create(null_fallback) + HivePartitioningFactory$create(null_fallback, url_decode_segments) } else { - HivePartitioning$create(schm, null_fallback) + HivePartitioning$create(schm, null_fallback, url_decode_segments) } } @@ -107,15 +113,17 @@ PartitioningFactory <- R6Class("PartitioningFactory", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioningFactory <- R6Class("DirectoryPartitioningFactory ", inherit = PartitioningFactory) -DirectoryPartitioningFactory$create <- dataset___DirectoryPartitioning__MakeFactory +DirectoryPartitioningFactory$create <- function(field_names, url_decode_segments = TRUE) { + dataset___DirectoryPartitioning__MakeFactory(field_names, url_decode_segments) +} #' @usage NULL #' @format NULL #' @rdname Partitioning #' @export HivePartitioningFactory <- R6Class("HivePartitioningFactory", inherit = PartitioningFactory) -HivePartitioningFactory$create <- function(null_fallback = NULL) { - dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback)) +HivePartitioningFactory$create <- function(null_fallback = NULL, url_decode_segments = TRUE) { + dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), url_decode_segments) } null_fallback_or_default <- function(null_fallback) { diff --git a/r/man/hive_partition.Rd b/r/man/hive_partition.Rd index aeb9cd4b3d1..d46a0629b65 100644 --- a/r/man/hive_partition.Rd +++ b/r/man/hive_partition.Rd @@ -4,7 +4,7 @@ \alias{hive_partition} \title{Construct Hive partitioning} \usage{ -hive_partition(..., null_fallback = NULL) +hive_partition(..., null_fallback = NULL, url_decode_segments = TRUE) } \arguments{ \item{...}{named list of \link[=data-type]{data types}, passed to \code{\link[=schema]{schema()}}} @@ -12,6 +12,9 @@ hive_partition(..., null_fallback = NULL) \item{null_fallback}{character to be used in place of missing values (\code{NA} or \code{NULL}) in partition columns. Default is \code{"__HIVE_DEFAULT_PARTITION__"}, which is what Hive uses.} + +\item{url_decode_segments}{URL-decode partition segments after splitting paths. +Default is \code{TRUE}.} } \value{ A \link[=Partitioning]{HivePartitioning}, or a \code{HivePartitioningFactory} if diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b7ca5e9414c..1f4f431b98f 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1856,61 +1856,65 @@ extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffe // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){ +std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm, bool url_decode_segments); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP url_decode_segments_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning(schm)); + arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning(schm, url_decode_segments)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP url_decode_segments_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp){ +std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names, bool url_decode_segments); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP url_decode_segments_sexp){ BEGIN_CPP11 arrow::r::Input&>::type field_names(field_names_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names)); + arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names, url_decode_segments)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP url_decode_segments_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback); -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp){ +std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback, bool url_decode_segments); +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); arrow::r::Input::type null_fallback(null_fallback_sexp); - return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback)); + arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); + return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback, url_decode_segments)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ Rf_error("Cannot call dataset___HivePartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback); -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp){ +std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback, bool url_decode_segments); +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ BEGIN_CPP11 arrow::r::Input::type null_fallback(null_fallback_sexp); - return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback)); + arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); + return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback, url_decode_segments)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ Rf_error("Cannot call dataset___HivePartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7005,10 +7009,10 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3}, - { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, - { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, - { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, - { "_arrow_dataset___HivePartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___HivePartitioning__MakeFactory, 1}, + { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 2}, + { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 2}, + { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 3}, + { "_arrow_dataset___HivePartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___HivePartitioning__MakeFactory, 2}, { "_arrow_dataset___ScannerBuilder__ProjectNames", (DL_FUNC) &_arrow_dataset___ScannerBuilder__ProjectNames, 2}, { "_arrow_dataset___ScannerBuilder__ProjectExprs", (DL_FUNC) &_arrow_dataset___ScannerBuilder__ProjectExprs, 3}, { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index c5ecc84dbaa..898875f81fa 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -335,28 +335,38 @@ dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buf // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning( - const std::shared_ptr& schm) { - return std::make_shared(schm); + const std::shared_ptr& schm, bool url_decode_segments) { + ds::KeyValuePartitioningOptions options; + options.url_decode_segments = url_decode_segments; + std::vector> dictionaries; + return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning__MakeFactory( - const std::vector& field_names) { - return ds::DirectoryPartitioning::MakeFactory(field_names); + const std::vector& field_names, bool url_decode_segments) { + ds::PartitioningFactoryOptions options; + options.url_decode_segments = url_decode_segments; + return ds::DirectoryPartitioning::MakeFactory(field_names, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning( - const std::shared_ptr& schm, const std::string& null_fallback) { + const std::shared_ptr& schm, const std::string& null_fallback, + bool url_decode_segments) { + ds::HivePartitioningOptions options; + options.null_fallback = null_fallback; + options.url_decode_segments = url_decode_segments; std::vector> dictionaries; - return std::make_shared(schm, dictionaries, null_fallback); + return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning__MakeFactory( - const std::string& null_fallback) { + const std::string& null_fallback, bool url_decode_segments) { ds::HivePartitioningFactoryOptions options; options.null_fallback = null_fallback; + options.url_decode_segments = url_decode_segments; return ds::HivePartitioning::MakeFactory(options); } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index be141c74659..b734e624723 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1108,6 +1108,117 @@ test_that("Assembling a Dataset manually and getting a Table", { expect_scan_result(ds, schm) }) +test_that("URL-decoding with directory partitioning", { + root <- make_temp_dir() + fmt <- FileFormat$create("feather") + fs <- LocalFileSystem$create() + selector <- FileSelector$create(root, recursive = TRUE) + dir1 <- file.path(root, "2021-05-04 00%3A00%3A00", "%24") + dir2 <- file.path(root, "2021-05-05 00:00:00", "$") + dir.create(dir1, recursive = TRUE) + dir.create(dir2, recursive = TRUE) + write_feather(df1, file.path(dir1, "data.feather")) + write_feather(df2, file.path(dir2, "data.feather")) + + partitioning <- DirectoryPartitioning$create( + schema(date = timestamp(unit = "s"), string = utf8())) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_scan_result(ds, schm) + + partitioning <- DirectoryPartitioning$create( + schema(date = timestamp(unit = "s"), string = utf8()), + url_decode_segments = FALSE) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + schm <- factory$Inspect() + expect_error(factory$Finish(schm), "Invalid: error parsing") + + partitioning_factory <- DirectoryPartitioningFactory$create( + c("date", "string")) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + # Can't directly inspect partition expressions, so do it implicitly via scan + expect_equal( + ds %>% + filter(date == "2021-05-04 00:00:00") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) + + partitioning_factory <- DirectoryPartitioningFactory$create( + c("date", "string"), url_decode_segments = FALSE) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(date == "2021-05-04 00%3A00%3A00") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) +}) + +test_that("URL-decoding with hive partitioning", { + root <- make_temp_dir() + fmt <- FileFormat$create("feather") + fs <- LocalFileSystem$create() + selector <- FileSelector$create(root, recursive = TRUE) + dir1 <- file.path(root, "date=2021-05-04 00%3A00%3A00", "string=%24") + dir2 <- file.path(root, "date=2021-05-05 00:00:00", "string=$") + dir.create(dir1, recursive = TRUE) + dir.create(dir2, recursive = TRUE) + write_feather(df1, file.path(dir1, "data.feather")) + write_feather(df2, file.path(dir2, "data.feather")) + + partitioning <- hive_partition( + date = timestamp(unit = "s"), string = utf8()) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + ds <- factory$Finish(schm) + expect_scan_result(ds, schm) + + partitioning <- hive_partition( + date = timestamp(unit = "s"), string = utf8(), url_decode_segments = FALSE) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning = partitioning) + expect_error(factory$Finish(schm), "Invalid: error parsing") + + partitioning_factory <- hive_partition() + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + # Can't directly inspect partition expressions, so do it implicitly via scan + expect_equal( + ds %>% + filter(date == "2021-05-04 00:00:00") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) + + partitioning_factory <- hive_partition(url_decode_segments = FALSE) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(date == "2021-05-04 00%3A00%3A00") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) +}) + test_that("Assembling multiple DatasetFactories with DatasetFactory", { skip_if_not_available("parquet") factory1 <- dataset_factory(file.path(dataset_dir, 1), format = "parquet") From b6675a93172dd047aafb500de3fde4efe5e68727 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 7 May 2021 11:50:11 -0400 Subject: [PATCH 04/10] ARROW-12644: [Python][R][Dataset] Don't use invalid filenames in tests --- python/pyarrow/tests/test_dataset.py | 24 ++++++++++-------------- r/tests/testthat/test-dataset.R | 14 ++++---------- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 315371eedb4..115ca0ac7f2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1428,6 +1428,7 @@ def test_partitioning_factory_url_decode(): mockfs = fs._MockFileSystem() format = ds.IpcFileFormat() schema = pa.schema([("i64", pa.int64())]) + table = pa.table([pa.array(range(10))], schema=schema) partition_schema = pa.schema( [("date", pa.timestamp("s")), ("string", pa.string())]) string_partition_schema = pa.schema( @@ -1435,13 +1436,12 @@ def test_partitioning_factory_url_decode(): full_schema = pa.schema(list(schema) + list(partition_schema)) for directory in [ "directory/2021-05-04 00%3A00%3A00/%24", - "directory/2021-05-05 00:00:00/$", "hive/date=2021-05-04 00%3A00%3A00/string=%24", - "hive/date=2021-05-05 00:00:00/string=$", ]: mockfs.create_dir(directory) with mockfs.open_output_stream(directory + "/0.feather") as sink: with pa.ipc.new_file(sink, schema) as writer: + writer.write_table(table) writer.close() # Directory @@ -1452,6 +1452,10 @@ def test_partitioning_factory_url_decode(): factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) inferred_schema = factory.inspect() assert inferred_schema == full_schema + actual = factory.finish().to_table(columns={ + "date_int": ds.field("date").cast(pa.int64()), + }) + assert actual[0][0].as_py() == 1620086400 options.partitioning_factory = ds.DirectoryPartitioning.discover( ["date", "string"], url_decode_segments=False) @@ -1460,9 +1464,6 @@ def test_partitioning_factory_url_decode(): assert fragments[0].partition_expression.equals( (ds.field("date") == "2021-05-04 00%3A00%3A00") & (ds.field("string") == "%24")) - assert fragments[1].partition_expression.equals( - (ds.field("date") == "2021-05-05 00:00:00") & - (ds.field("string") == "$")) options.partitioning = ds.DirectoryPartitioning( string_partition_schema, url_decode_segments=False) @@ -1471,9 +1472,6 @@ def test_partitioning_factory_url_decode(): assert fragments[0].partition_expression.equals( (ds.field("date") == "2021-05-04 00%3A00%3A00") & (ds.field("string") == "%24")) - assert fragments[1].partition_expression.equals( - (ds.field("date") == "2021-05-05 00:00:00") & - (ds.field("string") == "$")) options.partitioning_factory = ds.DirectoryPartitioning.discover( schema=partition_schema, url_decode_segments=False) @@ -1490,6 +1488,10 @@ def test_partitioning_factory_url_decode(): factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) inferred_schema = factory.inspect() assert inferred_schema == full_schema + actual = factory.finish().to_table(columns={ + "date_int": ds.field("date").cast(pa.int64()), + }) + assert actual[0][0].as_py() == 1620086400 options.partitioning_factory = ds.HivePartitioning.discover( url_decode_segments=False) @@ -1498,9 +1500,6 @@ def test_partitioning_factory_url_decode(): assert fragments[0].partition_expression.equals( (ds.field("date") == "2021-05-04 00%3A00%3A00") & (ds.field("string") == "%24")) - assert fragments[1].partition_expression.equals( - (ds.field("date") == "2021-05-05 00:00:00") & - (ds.field("string") == "$")) options.partitioning = ds.HivePartitioning( string_partition_schema, url_decode_segments=False) @@ -1509,9 +1508,6 @@ def test_partitioning_factory_url_decode(): assert fragments[0].partition_expression.equals( (ds.field("date") == "2021-05-04 00%3A00%3A00") & (ds.field("string") == "%24")) - assert fragments[1].partition_expression.equals( - (ds.field("date") == "2021-05-05 00:00:00") & - (ds.field("string") == "$")) options.partitioning_factory = ds.HivePartitioning.discover( schema=partition_schema, url_decode_segments=False) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index b734e624723..9664aa537a7 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1114,11 +1114,8 @@ test_that("URL-decoding with directory partitioning", { fs <- LocalFileSystem$create() selector <- FileSelector$create(root, recursive = TRUE) dir1 <- file.path(root, "2021-05-04 00%3A00%3A00", "%24") - dir2 <- file.path(root, "2021-05-05 00:00:00", "$") dir.create(dir1, recursive = TRUE) - dir.create(dir2, recursive = TRUE) write_feather(df1, file.path(dir1, "data.feather")) - write_feather(df2, file.path(dir2, "data.feather")) partitioning <- DirectoryPartitioning$create( schema(date = timestamp(unit = "s"), string = utf8())) @@ -1145,7 +1142,7 @@ test_that("URL-decoding with directory partitioning", { # Can't directly inspect partition expressions, so do it implicitly via scan expect_equal( ds %>% - filter(date == "2021-05-04 00:00:00") %>% + filter(date == "2021-05-04 00:00:00", string == "$") %>% select(int) %>% collect(), df1 %>% select(int) %>% collect() @@ -1159,7 +1156,7 @@ test_that("URL-decoding with directory partitioning", { ds <- factory$Finish(schm) expect_equal( ds %>% - filter(date == "2021-05-04 00%3A00%3A00") %>% + filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>% select(int) %>% collect(), df1 %>% select(int) %>% collect() @@ -1172,11 +1169,8 @@ test_that("URL-decoding with hive partitioning", { fs <- LocalFileSystem$create() selector <- FileSelector$create(root, recursive = TRUE) dir1 <- file.path(root, "date=2021-05-04 00%3A00%3A00", "string=%24") - dir2 <- file.path(root, "date=2021-05-05 00:00:00", "string=$") dir.create(dir1, recursive = TRUE) - dir.create(dir2, recursive = TRUE) write_feather(df1, file.path(dir1, "data.feather")) - write_feather(df2, file.path(dir2, "data.feather")) partitioning <- hive_partition( date = timestamp(unit = "s"), string = utf8()) @@ -1199,7 +1193,7 @@ test_that("URL-decoding with hive partitioning", { # Can't directly inspect partition expressions, so do it implicitly via scan expect_equal( ds %>% - filter(date == "2021-05-04 00:00:00") %>% + filter(date == "2021-05-04 00:00:00", string == "$") %>% select(int) %>% collect(), df1 %>% select(int) %>% collect() @@ -1212,7 +1206,7 @@ test_that("URL-decoding with hive partitioning", { ds <- factory$Finish(schm) expect_equal( ds %>% - filter(date == "2021-05-04 00%3A00%3A00") %>% + filter(date == "2021-05-04 00%3A00%3A00", string == "%24") %>% select(int) %>% collect(), df1 %>% select(int) %>% collect() From b9583e8449f0dc72cce36b78e9b0ea21e07f9111 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 7 May 2021 12:41:12 -0400 Subject: [PATCH 05/10] ARROW-12644: [C++][Dataset] Validate UTF8 after URL-decoding --- cpp/src/arrow/dataset/partition.cc | 55 ++++++++++++++++++++----- cpp/src/arrow/dataset/partition.h | 13 +++--- cpp/src/arrow/dataset/partition_test.cc | 14 ++++++- 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 7d020d183f2..8051b5fc91e 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -38,6 +38,7 @@ #include "arrow/util/make_unique.h" #include "arrow/util/string_view.h" #include "arrow/util/uri.h" +#include "arrow/util/utf8.h" namespace arrow { @@ -47,6 +48,18 @@ using util::string_view; namespace dataset { +namespace { +/// Apply UriUnescape, then ensure the results are valid UTF-8. +Result SafeUriUnescape(util::string_view encoded) { + auto decoded = internal::UriUnescape(encoded); + if (!util::ValidateUTF8(decoded)) { + return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ", + encoded); + } + return decoded; +} +} // namespace + std::shared_ptr Partitioning::Default() { class DefaultPartitioning : public Partitioning { public: @@ -210,7 +223,8 @@ Result KeyValuePartitioning::ConvertKey(const Key& key) con Result KeyValuePartitioning::Parse(const std::string& path) const { std::vector expressions; - for (const Key& key : ParseKeys(path)) { + ARROW_ASSIGN_OR_RAISE(auto parsed, ParseKeys(path)); + for (const Key& key : parsed) { ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key)); if (expr == compute::literal(true)) continue; expressions.push_back(std::move(expr)); @@ -260,7 +274,16 @@ Result KeyValuePartitioning::Format(const compute::Expression& expr return FormatValues(values); } -std::vector DirectoryPartitioning::ParseKeys( +DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr schema, + ArrayVector dictionaries, + KeyValuePartitioningOptions options) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) { + if (options.url_decode_segments) { + util::InitializeUTF8(); + } +} + +Result> DirectoryPartitioning::ParseKeys( const std::string& path) const { std::vector keys; @@ -269,7 +292,8 @@ std::vector DirectoryPartitioning::ParseKeys( if (i >= schema_->num_fields()) break; if (options_.url_decode_segments) { - keys.push_back({schema_->field(i++)->name(), internal::UriUnescape(segment)}); + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); } else { keys.push_back({schema_->field(i++)->name(), std::move(segment)}); } @@ -449,6 +473,9 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { PartitioningFactoryOptions options) : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) { Reset(); + if (options.url_decode_segments) { + util::InitializeUTF8(); + } } std::string type_name() const override { return "schema"; } @@ -461,8 +488,8 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { if (field_index == field_names_.size()) break; if (options_.url_decode_segments) { - RETURN_NOT_OK(InsertRepr(static_cast(field_index++), - internal::UriUnescape(segment))); + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); } else { RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); } @@ -506,7 +533,7 @@ std::shared_ptr DirectoryPartitioning::MakeFactory( new DirectoryPartitioningFactory(std::move(field_names), options)); } -util::optional HivePartitioning::ParseKey( +Result> HivePartitioning::ParseKey( const std::string& segment, const HivePartitioningOptions& options) { auto name_end = string_view(segment).find_first_of('='); // Not round-trippable @@ -517,8 +544,11 @@ util::optional HivePartitioning::ParseKey( auto name = segment.substr(0, name_end); std::string value; if (options.url_decode_segments) { - value = internal::UriUnescape( - util::string_view(segment.data() + name_end + 1, segment.size() - name_end - 1)); + // Static method, so we have no better place for it + util::InitializeUTF8(); + auto raw_value = + util::string_view(segment.data() + name_end + 1, segment.size() - name_end - 1); + ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); } else { value = segment.substr(name_end + 1); } @@ -529,12 +559,13 @@ util::optional HivePartitioning::ParseKey( return Key{std::move(name), std::move(value)}; } -std::vector HivePartitioning::ParseKeys( +Result> HivePartitioning::ParseKeys( const std::string& path) const { std::vector keys; for (const auto& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = ParseKey(segment, hive_options_)) { + ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_)); + if (auto key = maybe_key) { keys.push_back(std::move(*key)); } } @@ -574,7 +605,9 @@ class HivePartitioningFactory : public KeyValuePartitioningFactory { auto options = options_.AsHivePartitioningOptions(); for (auto path : paths) { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { - if (auto key = HivePartitioning::ParseKey(segment, options)) { + ARROW_ASSIGN_OR_RAISE(auto maybe_key, + HivePartitioning::ParseKey(segment, options)); + if (auto key = maybe_key) { RETURN_NOT_OK(InsertRepr(key->name, key->value)); } } diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 6fae96e60b7..694effc9494 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -171,7 +171,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { } } - virtual std::vector ParseKeys(const std::string& path) const = 0; + virtual Result> ParseKeys(const std::string& path) const = 0; virtual Result FormatValues(const ScalarVector& values) const = 0; @@ -194,8 +194,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { /// dictionaries must be contain the dictionary of values for that field. explicit DirectoryPartitioning( std::shared_ptr schema, ArrayVector dictionaries = {}, - KeyValuePartitioningOptions options = KeyValuePartitioningOptions()) - : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) {} + KeyValuePartitioningOptions options = KeyValuePartitioningOptions()); std::string type_name() const override { return "schema"; } @@ -207,7 +206,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { std::vector field_names, PartitioningFactoryOptions = {}); private: - std::vector ParseKeys(const std::string& path) const override; + Result> ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; @@ -255,8 +254,8 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { std::string null_fallback() const { return hive_options_.null_fallback; } const HivePartitioningOptions& options() const { return hive_options_; } - static util::optional ParseKey(const std::string& segment, - const HivePartitioningOptions& options); + static Result> ParseKey(const std::string& segment, + const HivePartitioningOptions& options); /// \brief Create a factory for a hive partitioning. static std::shared_ptr MakeFactory( @@ -264,7 +263,7 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { private: const HivePartitioningOptions hive_options_; - std::vector ParseKeys(const std::string& path) const override; + Result> ParseKeys(const std::string& path) const override; Result FormatValues(const ScalarVector& values) const override; }; diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 23b8593322c..56d57e75dcf 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -575,6 +575,12 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) { equal(field_ref("time"), literal(time)), equal(field_ref("str"), literal("$"))})); + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/%AF/%BF/%CF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/%AF/%BF/%CF"})); + options.url_decode_segments = false; options.schema = schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); @@ -619,6 +625,12 @@ TEST_F(TestPartitioning, UrlEncodedHive) { and_({equal(field_ref("date"), literal(date)), equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/date=%AF/time=%BF/str=%CF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/date=%AF/time=%BF/str=%CF"})); + options.url_decode_segments = false; options.schema = schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); @@ -735,7 +747,7 @@ class RangePartitioning : public Partitioning { HivePartitioningOptions options; for (auto segment : fs::internal::SplitAbstractPath(path)) { - auto key = HivePartitioning::ParseKey(segment, options); + ARROW_ASSIGN_OR_RAISE(auto key, HivePartitioning::ParseKey(segment, options)); if (!key) { return Status::Invalid("can't parse '", segment, "' as a range"); } From a7c6f60a8a6247d884cc8e7c8ac0a0956c51830e Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 1 Jun 2021 14:29:06 -0500 Subject: [PATCH 06/10] ARROW-12751: [C++][Python][R] Rename URL-decode option --- cpp/src/arrow/dataset/partition.cc | 79 ++++++++++++++------ cpp/src/arrow/dataset/partition.h | 30 +++++--- cpp/src/arrow/dataset/partition_test.cc | 4 +- python/pyarrow/_dataset.pyx | 44 +++++++---- python/pyarrow/includes/libarrow_dataset.pxd | 12 ++- python/pyarrow/tests/test_dataset.py | 14 ++-- r/R/arrowExports.R | 16 ++-- r/R/dataset-partition.R | 26 +++---- r/man/hive_partition.Rd | 6 +- r/src/arrowExports.cpp | 40 +++++----- r/src/dataset.cpp | 26 +++++-- r/tests/testthat/test-dataset.R | 8 +- 12 files changed, 189 insertions(+), 116 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 8051b5fc91e..b26f433685f 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -172,6 +172,21 @@ Result KeyValuePartitioning::Partition( return out; } +std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding) { + switch (segment_encoding) { + case SegmentEncoding::None: + os << "SegmentEncoding::None"; + break; + case SegmentEncoding::Url: + os << "SegmentEncoding::Url"; + break; + default: + os << "(invalid SegmentEncoding " << static_cast(segment_encoding) << ")"; + break; + } + return os; +} + Result KeyValuePartitioning::ConvertKey(const Key& key) const { ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(*schema_)); if (match.empty()) { @@ -278,7 +293,7 @@ DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr schema, ArrayVector dictionaries, KeyValuePartitioningOptions options) : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) { - if (options.url_decode_segments) { + if (options.segment_encoding != SegmentEncoding::None) { util::InitializeUTF8(); } } @@ -291,11 +306,18 @@ Result> DirectoryPartitioning::ParseKeys( for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (i >= schema_->num_fields()) break; - if (options_.url_decode_segments) { - ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); - keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); - } else { - keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + switch (options_.segment_encoding) { + case SegmentEncoding::None: + keys.push_back({schema_->field(i++)->name(), std::move(segment)}); + break; + case SegmentEncoding::Url: { + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options_.segment_encoding); } } @@ -339,14 +361,14 @@ Result DirectoryPartitioning::FormatValues( KeyValuePartitioningOptions PartitioningFactoryOptions::AsPartitioningOptions() const { KeyValuePartitioningOptions options; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = segment_encoding; return options; } HivePartitioningOptions HivePartitioningFactoryOptions::AsHivePartitioningOptions() const { HivePartitioningOptions options; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = segment_encoding; options.null_fallback = null_fallback; return options; } @@ -473,7 +495,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { PartitioningFactoryOptions options) : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) { Reset(); - if (options.url_decode_segments) { + if (options.segment_encoding != SegmentEncoding::None) { util::InitializeUTF8(); } } @@ -487,11 +509,18 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (field_index == field_names_.size()) break; - if (options_.url_decode_segments) { - ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); - RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); - } else { - RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + switch (options_.segment_encoding) { + case SegmentEncoding::None: + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); + break; + case SegmentEncoding::Url: { + ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options_.segment_encoding); } } } @@ -543,14 +572,20 @@ Result> HivePartitioning::ParseKey( auto name = segment.substr(0, name_end); std::string value; - if (options.url_decode_segments) { - // Static method, so we have no better place for it - util::InitializeUTF8(); - auto raw_value = - util::string_view(segment.data() + name_end + 1, segment.size() - name_end - 1); - ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); - } else { - value = segment.substr(name_end + 1); + switch (options.segment_encoding) { + case SegmentEncoding::None: + value = segment.substr(name_end + 1); + break; + case SegmentEncoding::Url: { + // Static method, so we have no better place for it + util::InitializeUTF8(); + auto raw_value = util::string_view(segment).substr(name_end + 1); + ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); + break; + } + default: + return Status::NotImplemented("Unknown segment encoding: ", + options.segment_encoding); } if (value == options.null_fallback) { diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 694effc9494..af624d361ff 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include #include @@ -89,11 +90,22 @@ class ARROW_DS_EXPORT Partitioning { std::shared_ptr schema_; }; +/// \brief The encoding of partition segments. +enum class SegmentEncoding : int8_t { + /// No encoding. + None = 0, + /// Segment values are URL-encoded. + Url = 1, +}; + +ARROW_EXPORT +std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding); + /// \brief Options for key-value based partitioning (hive/directory). struct ARROW_DS_EXPORT KeyValuePartitioningOptions { - /// After splitting a path into components, URL-decode the path components - /// before parsing. - bool url_decode_segments = true; + /// After splitting a path into components, decode the path components + /// before parsing according to this scheme. + SegmentEncoding segment_encoding = SegmentEncoding::Url; }; /// \brief Options for inferring a partitioning. @@ -107,9 +119,9 @@ struct ARROW_DS_EXPORT PartitioningFactoryOptions { /// will only check discovered fields against the schema and update internal /// state (such as dictionaries). std::shared_ptr schema; - /// After splitting a path into components, URL-decode the path components - /// before parsing. - bool url_decode_segments = true; + /// After splitting a path into components, decode the path components + /// before parsing according to this scheme. + SegmentEncoding segment_encoding = SegmentEncoding::Url; KeyValuePartitioningOptions AsPartitioningOptions() const; }; @@ -192,9 +204,9 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { public: /// If a field in schema is of dictionary type, the corresponding element of /// dictionaries must be contain the dictionary of values for that field. - explicit DirectoryPartitioning( - std::shared_ptr schema, ArrayVector dictionaries = {}, - KeyValuePartitioningOptions options = KeyValuePartitioningOptions()); + explicit DirectoryPartitioning(std::shared_ptr schema, + ArrayVector dictionaries = {}, + KeyValuePartitioningOptions options = {}); std::string type_name() const override { return "schema"; } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 56d57e75dcf..6d431c22dfc 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -581,7 +581,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) { EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), partitioning_->Parse({"/%AF/%BF/%CF"})); - options.url_decode_segments = false; + options.segment_encoding = SegmentEncoding::None; options.schema = schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); factory_ = DirectoryPartitioning::MakeFactory(options.schema->field_names(), options); @@ -631,7 +631,7 @@ TEST_F(TestPartitioning, UrlEncodedHive) { EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), partitioning_->Parse({"/date=%AF/time=%BF/str=%CF"})); - options.url_decode_segments = false; + options.segment_encoding = SegmentEncoding::None; options.schema = schema({field("date", utf8()), field("time", utf8()), field("str", utf8())}); factory_ = HivePartitioning::MakeFactory(options); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ced27ad009b..15803c89972 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -85,6 +85,14 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): return c_source +cdef SegmentEncoding _get_segment_encoding(str segment_encoding): + if segment_encoding == "none": + return SegmentEncoding_NONE + elif segment_encoding == "url": + return SegmentEncoding_URL + raise ValueError(f"Unknown segment encoding: {segment_encoding}") + + cdef class Expression(_Weakrefable): """ A logical expression to be evaluated against some input. @@ -1930,8 +1938,9 @@ cdef class DirectoryPartitioning(Partitioning): corresponding entry of `dictionaries` must be an array containing every value which may be taken by the corresponding column or an error will be raised in parsing. - url_decode_segments : bool, default True - After splitting paths into segments, URL-decode the segments. + segment_encoding : str, default "url" + After splitting paths into segments, decode the segments. Valid + values are "url" (URL-decode segments) and "none" (leave as-is). Returns ------- @@ -1950,12 +1959,12 @@ cdef class DirectoryPartitioning(Partitioning): CDirectoryPartitioning* directory_partitioning def __init__(self, Schema schema not None, dictionaries=None, - bint url_decode_segments=True): + segment_encoding="url"): cdef: shared_ptr[CDirectoryPartitioning] c_partitioning CKeyValuePartitioningOptions c_options - c_options.url_decode_segments = url_decode_segments + c_options.segment_encoding = _get_segment_encoding(segment_encoding) c_partitioning = make_shared[CDirectoryPartitioning]( pyarrow_unwrap_schema(schema), _partitioning_dictionaries(schema, dictionaries), @@ -1970,7 +1979,7 @@ cdef class DirectoryPartitioning(Partitioning): @staticmethod def discover(field_names=None, infer_dictionary=False, max_partition_dictionary_size=0, - schema=None, bint url_decode_segments=True): + schema=None, segment_encoding="url"): """ Discover a DirectoryPartitioning. @@ -1993,8 +2002,9 @@ cdef class DirectoryPartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. - url_decode_segments : bool, default True - After splitting paths into segments, URL-decode the segments. + segment_encoding : str, default "url" + After splitting paths into segments, decode the segments. Valid + values are "url" (URL-decode segments) and "none" (leave as-is). Returns ------- @@ -2024,7 +2034,7 @@ cdef class DirectoryPartitioning(Partitioning): else: c_field_names = [tobytes(s) for s in field_names] - c_options.url_decode_segments = url_decode_segments + c_options.segment_encoding = _get_segment_encoding(segment_encoding) return PartitioningFactory.wrap( CDirectoryPartitioning.MakeFactory(c_field_names, c_options)) @@ -2055,8 +2065,9 @@ cdef class HivePartitioning(Partitioning): error will be raised in parsing. null_fallback : str, default "__HIVE_DEFAULT_PARTITION__" If any field is None then this fallback will be used as a label - url_decode_segments : bool, default True - After splitting paths into segments, URL-decode the segments. + segment_encoding : str, default "url" + After splitting paths into segments, decode the segments. Valid + values are "url" (URL-decode segments) and "none" (leave as-is). Returns ------- @@ -2079,14 +2090,14 @@ cdef class HivePartitioning(Partitioning): Schema schema not None, dictionaries=None, null_fallback="__HIVE_DEFAULT_PARTITION__", - bint url_decode_segments=True): + segment_encoding="url"): cdef: shared_ptr[CHivePartitioning] c_partitioning CHivePartitioningOptions c_options c_options.null_fallback = tobytes(null_fallback) - c_options.url_decode_segments = url_decode_segments + c_options.segment_encoding = _get_segment_encoding(segment_encoding) c_partitioning = make_shared[CHivePartitioning]( pyarrow_unwrap_schema(schema), @@ -2104,7 +2115,7 @@ cdef class HivePartitioning(Partitioning): max_partition_dictionary_size=0, null_fallback="__HIVE_DEFAULT_PARTITION__", schema=None, - bint url_decode_segments=True): + segment_encoding="url"): """ Discover a HivePartitioning. @@ -2128,8 +2139,9 @@ cdef class HivePartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. - url_decode_segments : bool, default True - After splitting paths into segments, URL-decode the segments. + segment_encoding : str, default "url" + After splitting paths into segments, decode the segments. Valid + values are "url" (URL-decode segments) and "none" (leave as-is). Returns ------- @@ -2153,7 +2165,7 @@ cdef class HivePartitioning(Partitioning): if schema: c_options.schema = pyarrow_unwrap_schema(schema) - c_options.url_decode_segments = url_decode_segments + c_options.segment_encoding = _get_segment_encoding(segment_encoding) return PartitioningFactory.wrap( CHivePartitioning.MakeFactory(c_options)) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 7f615247897..9335a5e1fd3 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -305,27 +305,31 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CExpression] Parse(const c_string & path) const const shared_ptr[CSchema] & schema() + enum SegmentEncoding" arrow::dataset::SegmentEncoding": + SegmentEncoding_NONE" arrow::dataset::SegmentEncoding::None" + SegmentEncoding_URL" arrow::dataset::SegmentEncoding::Url" + cdef cppclass CKeyValuePartitioningOptions \ "arrow::dataset::KeyValuePartitioningOptions": - c_bool url_decode_segments + SegmentEncoding segment_encoding cdef cppclass CHivePartitioningOptions \ "arrow::dataset::HivePartitioningOptions": - c_bool url_decode_segments + SegmentEncoding segment_encoding c_string null_fallback cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": c_bool infer_dictionary shared_ptr[CSchema] schema - c_bool url_decode_segments + SegmentEncoding segment_encoding cdef cppclass CHivePartitioningFactoryOptions \ "arrow::dataset::HivePartitioningFactoryOptions": c_bool infer_dictionary c_string null_fallback shared_ptr[CSchema] schema - c_bool url_decode_segments + SegmentEncoding segment_encoding cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 115ca0ac7f2..5e83657ebf2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1424,7 +1424,7 @@ def test_partitioning_factory_dictionary(mockfs, infer_dictionary): assert inferred_schema.field('key').type == pa.string() -def test_partitioning_factory_url_decode(): +def test_partitioning_factory_segment_encoding(): mockfs = fs._MockFileSystem() format = ds.IpcFileFormat() schema = pa.schema([("i64", pa.int64())]) @@ -1458,7 +1458,7 @@ def test_partitioning_factory_url_decode(): assert actual[0][0].as_py() == 1620086400 options.partitioning_factory = ds.DirectoryPartitioning.discover( - ["date", "string"], url_decode_segments=False) + ["date", "string"], segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) fragments = list(factory.finish().get_fragments()) assert fragments[0].partition_expression.equals( @@ -1466,7 +1466,7 @@ def test_partitioning_factory_url_decode(): (ds.field("string") == "%24")) options.partitioning = ds.DirectoryPartitioning( - string_partition_schema, url_decode_segments=False) + string_partition_schema, segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) fragments = list(factory.finish().get_fragments()) assert fragments[0].partition_expression.equals( @@ -1474,7 +1474,7 @@ def test_partitioning_factory_url_decode(): (ds.field("string") == "%24")) options.partitioning_factory = ds.DirectoryPartitioning.discover( - schema=partition_schema, url_decode_segments=False) + schema=partition_schema, segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) with pytest.raises(pa.ArrowInvalid, match="Could not cast segments for partition field"): @@ -1494,7 +1494,7 @@ def test_partitioning_factory_url_decode(): assert actual[0][0].as_py() == 1620086400 options.partitioning_factory = ds.HivePartitioning.discover( - url_decode_segments=False) + segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) fragments = list(factory.finish().get_fragments()) assert fragments[0].partition_expression.equals( @@ -1502,7 +1502,7 @@ def test_partitioning_factory_url_decode(): (ds.field("string") == "%24")) options.partitioning = ds.HivePartitioning( - string_partition_schema, url_decode_segments=False) + string_partition_schema, segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) fragments = list(factory.finish().get_fragments()) assert fragments[0].partition_expression.equals( @@ -1510,7 +1510,7 @@ def test_partitioning_factory_url_decode(): (ds.field("string") == "%24")) options.partitioning_factory = ds.HivePartitioning.discover( - schema=partition_schema, url_decode_segments=False) + schema=partition_schema, segment_encoding="none") factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) with pytest.raises(pa.ArrowInvalid, match="Could not cast segments for partition field"): diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a197fc98262..2c8163bb201 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -472,20 +472,20 @@ dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buff .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer) } -dataset___DirectoryPartitioning <- function(schm, url_decode_segments){ - .Call(`_arrow_dataset___DirectoryPartitioning`, schm, url_decode_segments) +dataset___DirectoryPartitioning <- function(schm, segment_encoding){ + .Call(`_arrow_dataset___DirectoryPartitioning`, schm, segment_encoding) } -dataset___DirectoryPartitioning__MakeFactory <- function(field_names, url_decode_segments){ - .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names, url_decode_segments) +dataset___DirectoryPartitioning__MakeFactory <- function(field_names, segment_encoding){ + .Call(`_arrow_dataset___DirectoryPartitioning__MakeFactory`, field_names, segment_encoding) } -dataset___HivePartitioning <- function(schm, null_fallback, url_decode_segments){ - .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback, url_decode_segments) +dataset___HivePartitioning <- function(schm, null_fallback, segment_encoding){ + .Call(`_arrow_dataset___HivePartitioning`, schm, null_fallback, segment_encoding) } -dataset___HivePartitioning__MakeFactory <- function(null_fallback, url_decode_segments){ - .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback, url_decode_segments) +dataset___HivePartitioning__MakeFactory <- function(null_fallback, segment_encoding){ + .Call(`_arrow_dataset___HivePartitioning__MakeFactory`, null_fallback, segment_encoding) } dataset___ScannerBuilder__ProjectNames <- function(sb, cols){ diff --git a/r/R/dataset-partition.R b/r/R/dataset-partition.R index e224aaf8d74..3345f4ff1bd 100644 --- a/r/R/dataset-partition.R +++ b/r/R/dataset-partition.R @@ -64,8 +64,8 @@ Partitioning <- R6Class("Partitioning", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioning <- R6Class("DirectoryPartitioning", inherit = Partitioning) -DirectoryPartitioning$create <- function(schm, url_decode_segments = TRUE) { - dataset___DirectoryPartitioning(schm, url_decode_segments = url_decode_segments) +DirectoryPartitioning$create <- function(schm, segment_encoding = "url") { + dataset___DirectoryPartitioning(schm, segment_encoding = segment_encoding) } #' @usage NULL @@ -73,10 +73,10 @@ DirectoryPartitioning$create <- function(schm, url_decode_segments = TRUE) { #' @rdname Partitioning #' @export HivePartitioning <- R6Class("HivePartitioning", inherit = Partitioning) -HivePartitioning$create <- function(schm, null_fallback = NULL, url_decode_segments = TRUE) { +HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "url") { dataset___HivePartitioning(schm, null_fallback = null_fallback_or_default(null_fallback), - url_decode_segments = url_decode_segments) + segment_encoding = segment_encoding) } #' Construct Hive partitioning @@ -90,19 +90,19 @@ HivePartitioning$create <- function(schm, null_fallback = NULL, url_decode_segme #' @param null_fallback character to be used in place of missing values (`NA` or `NULL`) #' in partition columns. Default is `"__HIVE_DEFAULT_PARTITION__"`, #' which is what Hive uses. -#' @param url_decode_segments URL-decode partition segments after splitting paths. -#' Default is `TRUE`. +#' @param segment_encoding Decode partition segments after splitting paths. +#' Default is `"url"` (URL-decode segments). May also be `"none"` (leave as-is). #' @return A [HivePartitioning][Partitioning], or a `HivePartitioningFactory` if #' calling `hive_partition()` with no arguments. #' @examplesIf arrow_with_dataset() #' hive_partition(year = int16(), month = int8()) #' @export -hive_partition <- function(..., null_fallback = NULL, url_decode_segments = TRUE) { +hive_partition <- function(..., null_fallback = NULL, segment_encoding = "url") { schm <- schema(...) if (length(schm) == 0) { - HivePartitioningFactory$create(null_fallback, url_decode_segments) + HivePartitioningFactory$create(null_fallback, segment_encoding) } else { - HivePartitioning$create(schm, null_fallback, url_decode_segments) + HivePartitioning$create(schm, null_fallback, segment_encoding) } } @@ -113,8 +113,8 @@ PartitioningFactory <- R6Class("PartitioningFactory", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioningFactory <- R6Class("DirectoryPartitioningFactory ", inherit = PartitioningFactory) -DirectoryPartitioningFactory$create <- function(field_names, url_decode_segments = TRUE) { - dataset___DirectoryPartitioning__MakeFactory(field_names, url_decode_segments) +DirectoryPartitioningFactory$create <- function(field_names, segment_encoding = "url") { + dataset___DirectoryPartitioning__MakeFactory(field_names, segment_encoding) } #' @usage NULL @@ -122,8 +122,8 @@ DirectoryPartitioningFactory$create <- function(field_names, url_decode_segments #' @rdname Partitioning #' @export HivePartitioningFactory <- R6Class("HivePartitioningFactory", inherit = PartitioningFactory) -HivePartitioningFactory$create <- function(null_fallback = NULL, url_decode_segments = TRUE) { - dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), url_decode_segments) +HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "url") { + dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), segment_encoding) } null_fallback_or_default <- function(null_fallback) { diff --git a/r/man/hive_partition.Rd b/r/man/hive_partition.Rd index d46a0629b65..3d88ac6dba7 100644 --- a/r/man/hive_partition.Rd +++ b/r/man/hive_partition.Rd @@ -4,7 +4,7 @@ \alias{hive_partition} \title{Construct Hive partitioning} \usage{ -hive_partition(..., null_fallback = NULL, url_decode_segments = TRUE) +hive_partition(..., null_fallback = NULL, segment_encoding = "url") } \arguments{ \item{...}{named list of \link[=data-type]{data types}, passed to \code{\link[=schema]{schema()}}} @@ -13,8 +13,8 @@ hive_partition(..., null_fallback = NULL, url_decode_segments = TRUE) in partition columns. Default is \code{"__HIVE_DEFAULT_PARTITION__"}, which is what Hive uses.} -\item{url_decode_segments}{URL-decode partition segments after splitting paths. -Default is \code{TRUE}.} +\item{segment_encoding}{Decode partition segments after splitting paths. +Default is \code{"url"} (URL-decode segments). May also be \code{"none"} (leave as-is).} } \value{ A \link[=Partitioning]{HivePartitioning}, or a \code{HivePartitioningFactory} if diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 1f4f431b98f..2f0b036cd80 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1856,65 +1856,65 @@ extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffe // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm, bool url_decode_segments); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP url_decode_segments_sexp){ +std::shared_ptr dataset___DirectoryPartitioning(const std::shared_ptr& schm, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); - arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning(schm, url_decode_segments)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning(schm, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP url_decode_segments_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names, bool url_decode_segments); -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP url_decode_segments_sexp){ +std::shared_ptr dataset___DirectoryPartitioning__MakeFactory(const std::vector& field_names, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type field_names(field_names_sexp); - arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); - return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names, url_decode_segments)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___DirectoryPartitioning__MakeFactory(field_names, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP url_decode_segments_sexp){ +extern "C" SEXP _arrow_dataset___DirectoryPartitioning__MakeFactory(SEXP field_names_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___DirectoryPartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback, bool url_decode_segments); -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ +std::shared_ptr dataset___HivePartitioning(const std::shared_ptr& schm, const std::string& null_fallback, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input&>::type schm(schm_sexp); arrow::r::Input::type null_fallback(null_fallback_sexp); - arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); - return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback, url_decode_segments)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___HivePartitioning(schm, null_fallback, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning(SEXP schm_sexp, SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___HivePartitioning(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback, bool url_decode_segments); -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ +std::shared_ptr dataset___HivePartitioning__MakeFactory(const std::string& null_fallback, const std::string& segment_encoding); +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ BEGIN_CPP11 arrow::r::Input::type null_fallback(null_fallback_sexp); - arrow::r::Input::type url_decode_segments(url_decode_segments_sexp); - return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback, url_decode_segments)); + arrow::r::Input::type segment_encoding(segment_encoding_sexp); + return cpp11::as_sexp(dataset___HivePartitioning__MakeFactory(null_fallback, segment_encoding)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP url_decode_segments_sexp){ +extern "C" SEXP _arrow_dataset___HivePartitioning__MakeFactory(SEXP null_fallback_sexp, SEXP segment_encoding_sexp){ Rf_error("Cannot call dataset___HivePartitioning__MakeFactory(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 898875f81fa..9fb0395e6a1 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -333,40 +333,50 @@ dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buf // DirectoryPartitioning, HivePartitioning +ds::SegmentEncoding GetSegmentEncoding(const std::string& segment_encoding) { + if (segment_encoding == "none") { + return ds::SegmentEncoding::None; + } else if (segment_encoding == "url") { + return ds::SegmentEncoding::Url; + } + cpp11::stop("invalid segment encoding: " + segment_encoding); + return ds::SegmentEncoding::None; +} + // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning( - const std::shared_ptr& schm, bool url_decode_segments) { + const std::shared_ptr& schm, const std::string& segment_encoding) { ds::KeyValuePartitioningOptions options; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = GetSegmentEncoding(segment_encoding); std::vector> dictionaries; return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___DirectoryPartitioning__MakeFactory( - const std::vector& field_names, bool url_decode_segments) { + const std::vector& field_names, const std::string& segment_encoding) { ds::PartitioningFactoryOptions options; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = GetSegmentEncoding(segment_encoding); return ds::DirectoryPartitioning::MakeFactory(field_names, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning( const std::shared_ptr& schm, const std::string& null_fallback, - bool url_decode_segments) { + const std::string& segment_encoding) { ds::HivePartitioningOptions options; options.null_fallback = null_fallback; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = GetSegmentEncoding(segment_encoding); std::vector> dictionaries; return std::make_shared(schm, dictionaries, options); } // [[dataset::export]] std::shared_ptr dataset___HivePartitioning__MakeFactory( - const std::string& null_fallback, bool url_decode_segments) { + const std::string& null_fallback, const std::string& segment_encoding) { ds::HivePartitioningFactoryOptions options; options.null_fallback = null_fallback; - options.url_decode_segments = url_decode_segments; + options.segment_encoding = GetSegmentEncoding(segment_encoding); return ds::HivePartitioning::MakeFactory(options); } diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 9664aa537a7..ed9b4c3ffb1 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1127,7 +1127,7 @@ test_that("URL-decoding with directory partitioning", { partitioning <- DirectoryPartitioning$create( schema(date = timestamp(unit = "s"), string = utf8()), - url_decode_segments = FALSE) + segment_encoding = "none") factory <- FileSystemDatasetFactory$create( fs, selector, NULL, fmt, partitioning = partitioning) schm <- factory$Inspect() @@ -1149,7 +1149,7 @@ test_that("URL-decoding with directory partitioning", { ) partitioning_factory <- DirectoryPartitioningFactory$create( - c("date", "string"), url_decode_segments = FALSE) + c("date", "string"), segment_encoding = "none") factory <- FileSystemDatasetFactory$create( fs, selector, NULL, fmt, partitioning_factory) schm <- factory$Inspect() @@ -1180,7 +1180,7 @@ test_that("URL-decoding with hive partitioning", { expect_scan_result(ds, schm) partitioning <- hive_partition( - date = timestamp(unit = "s"), string = utf8(), url_decode_segments = FALSE) + date = timestamp(unit = "s"), string = utf8(), segment_encoding = "none") factory <- FileSystemDatasetFactory$create( fs, selector, NULL, fmt, partitioning = partitioning) expect_error(factory$Finish(schm), "Invalid: error parsing") @@ -1199,7 +1199,7 @@ test_that("URL-decoding with hive partitioning", { df1 %>% select(int) %>% collect() ) - partitioning_factory <- hive_partition(url_decode_segments = FALSE) + partitioning_factory <- hive_partition(segment_encoding = "none") factory <- FileSystemDatasetFactory$create( fs, selector, NULL, fmt, partitioning_factory) schm <- factory$Inspect() From 719913ef0a9ea9471a39c62800b91ae5269b8ff5 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 2 Jun 2021 09:15:11 -0500 Subject: [PATCH 07/10] ARROW-12644: [C++][Dataset] Rename URL to URI --- cpp/src/arrow/dataset/partition.cc | 10 +++---- cpp/src/arrow/dataset/partition.h | 6 ++--- python/pyarrow/_dataset.pyx | 28 ++++++++++---------- python/pyarrow/includes/libarrow_dataset.pxd | 2 +- r/R/dataset-partition.R | 12 ++++----- r/man/hive_partition.Rd | 4 +-- r/src/dataset.cpp | 4 +-- r/tests/testthat/test-dataset.R | 4 +-- 8 files changed, 35 insertions(+), 35 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index b26f433685f..1f6f0d548aa 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -177,8 +177,8 @@ std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding) { case SegmentEncoding::None: os << "SegmentEncoding::None"; break; - case SegmentEncoding::Url: - os << "SegmentEncoding::Url"; + case SegmentEncoding::Uri: + os << "SegmentEncoding::Uri"; break; default: os << "(invalid SegmentEncoding " << static_cast(segment_encoding) << ")"; @@ -310,7 +310,7 @@ Result> DirectoryPartitioning::ParseKeys( case SegmentEncoding::None: keys.push_back({schema_->field(i++)->name(), std::move(segment)}); break; - case SegmentEncoding::Url: { + case SegmentEncoding::Uri: { ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); break; @@ -513,7 +513,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { case SegmentEncoding::None: RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); break; - case SegmentEncoding::Url: { + case SegmentEncoding::Uri: { ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); break; @@ -576,7 +576,7 @@ Result> HivePartitioning::ParseKey( case SegmentEncoding::None: value = segment.substr(name_end + 1); break; - case SegmentEncoding::Url: { + case SegmentEncoding::Uri: { // Static method, so we have no better place for it util::InitializeUTF8(); auto raw_value = util::string_view(segment).substr(name_end + 1); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index af624d361ff..024dd5d1aeb 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -95,7 +95,7 @@ enum class SegmentEncoding : int8_t { /// No encoding. None = 0, /// Segment values are URL-encoded. - Url = 1, + Uri = 1, }; ARROW_EXPORT @@ -105,7 +105,7 @@ std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding); struct ARROW_DS_EXPORT KeyValuePartitioningOptions { /// After splitting a path into components, decode the path components /// before parsing according to this scheme. - SegmentEncoding segment_encoding = SegmentEncoding::Url; + SegmentEncoding segment_encoding = SegmentEncoding::Uri; }; /// \brief Options for inferring a partitioning. @@ -121,7 +121,7 @@ struct ARROW_DS_EXPORT PartitioningFactoryOptions { std::shared_ptr schema; /// After splitting a path into components, decode the path components /// before parsing according to this scheme. - SegmentEncoding segment_encoding = SegmentEncoding::Url; + SegmentEncoding segment_encoding = SegmentEncoding::Uri; KeyValuePartitioningOptions AsPartitioningOptions() const; }; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 15803c89972..6b2289e087d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -88,8 +88,8 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): cdef SegmentEncoding _get_segment_encoding(str segment_encoding): if segment_encoding == "none": return SegmentEncoding_NONE - elif segment_encoding == "url": - return SegmentEncoding_URL + elif segment_encoding == "uri": + return SegmentEncoding_URI raise ValueError(f"Unknown segment encoding: {segment_encoding}") @@ -1938,9 +1938,9 @@ cdef class DirectoryPartitioning(Partitioning): corresponding entry of `dictionaries` must be an array containing every value which may be taken by the corresponding column or an error will be raised in parsing. - segment_encoding : str, default "url" + segment_encoding : str, default "uri" After splitting paths into segments, decode the segments. Valid - values are "url" (URL-decode segments) and "none" (leave as-is). + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -1959,7 +1959,7 @@ cdef class DirectoryPartitioning(Partitioning): CDirectoryPartitioning* directory_partitioning def __init__(self, Schema schema not None, dictionaries=None, - segment_encoding="url"): + segment_encoding="uri"): cdef: shared_ptr[CDirectoryPartitioning] c_partitioning CKeyValuePartitioningOptions c_options @@ -1979,7 +1979,7 @@ cdef class DirectoryPartitioning(Partitioning): @staticmethod def discover(field_names=None, infer_dictionary=False, max_partition_dictionary_size=0, - schema=None, segment_encoding="url"): + schema=None, segment_encoding="uri"): """ Discover a DirectoryPartitioning. @@ -2002,9 +2002,9 @@ cdef class DirectoryPartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. - segment_encoding : str, default "url" + segment_encoding : str, default "uri" After splitting paths into segments, decode the segments. Valid - values are "url" (URL-decode segments) and "none" (leave as-is). + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -2065,9 +2065,9 @@ cdef class HivePartitioning(Partitioning): error will be raised in parsing. null_fallback : str, default "__HIVE_DEFAULT_PARTITION__" If any field is None then this fallback will be used as a label - segment_encoding : str, default "url" + segment_encoding : str, default "uri" After splitting paths into segments, decode the segments. Valid - values are "url" (URL-decode segments) and "none" (leave as-is). + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- @@ -2090,7 +2090,7 @@ cdef class HivePartitioning(Partitioning): Schema schema not None, dictionaries=None, null_fallback="__HIVE_DEFAULT_PARTITION__", - segment_encoding="url"): + segment_encoding="uri"): cdef: shared_ptr[CHivePartitioning] c_partitioning @@ -2115,7 +2115,7 @@ cdef class HivePartitioning(Partitioning): max_partition_dictionary_size=0, null_fallback="__HIVE_DEFAULT_PARTITION__", schema=None, - segment_encoding="url"): + segment_encoding="uri"): """ Discover a HivePartitioning. @@ -2139,9 +2139,9 @@ cdef class HivePartitioning(Partitioning): Use this schema instead of inferring a schema from partition values. Partition values will be validated against this schema before accumulation into the Partitioning's dictionary. - segment_encoding : str, default "url" + segment_encoding : str, default "uri" After splitting paths into segments, decode the segments. Valid - values are "url" (URL-decode segments) and "none" (leave as-is). + values are "uri" (URI-decode segments) and "none" (leave as-is). Returns ------- diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9335a5e1fd3..4883691ee51 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -307,7 +307,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: enum SegmentEncoding" arrow::dataset::SegmentEncoding": SegmentEncoding_NONE" arrow::dataset::SegmentEncoding::None" - SegmentEncoding_URL" arrow::dataset::SegmentEncoding::Url" + SegmentEncoding_URI" arrow::dataset::SegmentEncoding::Uri" cdef cppclass CKeyValuePartitioningOptions \ "arrow::dataset::KeyValuePartitioningOptions": diff --git a/r/R/dataset-partition.R b/r/R/dataset-partition.R index 3345f4ff1bd..6e29e4ea31c 100644 --- a/r/R/dataset-partition.R +++ b/r/R/dataset-partition.R @@ -64,7 +64,7 @@ Partitioning <- R6Class("Partitioning", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioning <- R6Class("DirectoryPartitioning", inherit = Partitioning) -DirectoryPartitioning$create <- function(schm, segment_encoding = "url") { +DirectoryPartitioning$create <- function(schm, segment_encoding = "uri") { dataset___DirectoryPartitioning(schm, segment_encoding = segment_encoding) } @@ -73,7 +73,7 @@ DirectoryPartitioning$create <- function(schm, segment_encoding = "url") { #' @rdname Partitioning #' @export HivePartitioning <- R6Class("HivePartitioning", inherit = Partitioning) -HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "url") { +HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "uri") { dataset___HivePartitioning(schm, null_fallback = null_fallback_or_default(null_fallback), segment_encoding = segment_encoding) @@ -91,13 +91,13 @@ HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding #' in partition columns. Default is `"__HIVE_DEFAULT_PARTITION__"`, #' which is what Hive uses. #' @param segment_encoding Decode partition segments after splitting paths. -#' Default is `"url"` (URL-decode segments). May also be `"none"` (leave as-is). +#' Default is `"uri"` (URI-decode segments). May also be `"none"` (leave as-is). #' @return A [HivePartitioning][Partitioning], or a `HivePartitioningFactory` if #' calling `hive_partition()` with no arguments. #' @examplesIf arrow_with_dataset() #' hive_partition(year = int16(), month = int8()) #' @export -hive_partition <- function(..., null_fallback = NULL, segment_encoding = "url") { +hive_partition <- function(..., null_fallback = NULL, segment_encoding = "uri") { schm <- schema(...) if (length(schm) == 0) { HivePartitioningFactory$create(null_fallback, segment_encoding) @@ -113,7 +113,7 @@ PartitioningFactory <- R6Class("PartitioningFactory", inherit = ArrowObject) #' @rdname Partitioning #' @export DirectoryPartitioningFactory <- R6Class("DirectoryPartitioningFactory ", inherit = PartitioningFactory) -DirectoryPartitioningFactory$create <- function(field_names, segment_encoding = "url") { +DirectoryPartitioningFactory$create <- function(field_names, segment_encoding = "uri") { dataset___DirectoryPartitioning__MakeFactory(field_names, segment_encoding) } @@ -122,7 +122,7 @@ DirectoryPartitioningFactory$create <- function(field_names, segment_encoding = #' @rdname Partitioning #' @export HivePartitioningFactory <- R6Class("HivePartitioningFactory", inherit = PartitioningFactory) -HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "url") { +HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "uri") { dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), segment_encoding) } diff --git a/r/man/hive_partition.Rd b/r/man/hive_partition.Rd index 3d88ac6dba7..eef9f9157ea 100644 --- a/r/man/hive_partition.Rd +++ b/r/man/hive_partition.Rd @@ -4,7 +4,7 @@ \alias{hive_partition} \title{Construct Hive partitioning} \usage{ -hive_partition(..., null_fallback = NULL, segment_encoding = "url") +hive_partition(..., null_fallback = NULL, segment_encoding = "uri") } \arguments{ \item{...}{named list of \link[=data-type]{data types}, passed to \code{\link[=schema]{schema()}}} @@ -14,7 +14,7 @@ in partition columns. Default is \code{"__HIVE_DEFAULT_PARTITION__"}, which is what Hive uses.} \item{segment_encoding}{Decode partition segments after splitting paths. -Default is \code{"url"} (URL-decode segments). May also be \code{"none"} (leave as-is).} +Default is \code{"uri"} (URI-decode segments). May also be \code{"none"} (leave as-is).} } \value{ A \link[=Partitioning]{HivePartitioning}, or a \code{HivePartitioningFactory} if diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 9fb0395e6a1..24c1a1343ea 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -336,8 +336,8 @@ dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buf ds::SegmentEncoding GetSegmentEncoding(const std::string& segment_encoding) { if (segment_encoding == "none") { return ds::SegmentEncoding::None; - } else if (segment_encoding == "url") { - return ds::SegmentEncoding::Url; + } else if (segment_encoding == "uri") { + return ds::SegmentEncoding::Uri; } cpp11::stop("invalid segment encoding: " + segment_encoding); return ds::SegmentEncoding::None; diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index ed9b4c3ffb1..e7b9322fd50 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1108,7 +1108,7 @@ test_that("Assembling a Dataset manually and getting a Table", { expect_scan_result(ds, schm) }) -test_that("URL-decoding with directory partitioning", { +test_that("URI-decoding with directory partitioning", { root <- make_temp_dir() fmt <- FileFormat$create("feather") fs <- LocalFileSystem$create() @@ -1163,7 +1163,7 @@ test_that("URL-decoding with directory partitioning", { ) }) -test_that("URL-decoding with hive partitioning", { +test_that("URI-decoding with hive partitioning", { root <- make_temp_dir() fmt <- FileFormat$create("feather") fs <- LocalFileSystem$create() From bc3a2cb5cfabd80eed59498f016d428ccf4e8280 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 2 Jun 2021 09:24:35 -0500 Subject: [PATCH 08/10] ARROW-12644: [C++][Dataset] Always validate UTF-8 --- cpp/src/arrow/dataset/partition.cc | 31 ++++++++++++++++--------- cpp/src/arrow/dataset/partition_test.cc | 7 ++++++ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 1f6f0d548aa..5c390b6b487 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -293,9 +293,7 @@ DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr schema, ArrayVector dictionaries, KeyValuePartitioningOptions options) : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options) { - if (options.segment_encoding != SegmentEncoding::None) { - util::InitializeUTF8(); - } + util::InitializeUTF8(); } Result> DirectoryPartitioning::ParseKeys( @@ -307,9 +305,13 @@ Result> DirectoryPartitioning::ParseKeys( if (i >= schema_->num_fields()) break; switch (options_.segment_encoding) { - case SegmentEncoding::None: + case SegmentEncoding::None: { + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", segment); + } keys.push_back({schema_->field(i++)->name(), std::move(segment)}); break; + } case SegmentEncoding::Uri: { ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); keys.push_back({schema_->field(i++)->name(), std::move(decoded)}); @@ -495,9 +497,7 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { PartitioningFactoryOptions options) : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) { Reset(); - if (options.segment_encoding != SegmentEncoding::None) { - util::InitializeUTF8(); - } + util::InitializeUTF8(); } std::string type_name() const override { return "schema"; } @@ -510,9 +510,13 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { if (field_index == field_names_.size()) break; switch (options_.segment_encoding) { - case SegmentEncoding::None: + case SegmentEncoding::None: { + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", segment); + } RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); break; + } case SegmentEncoding::Uri: { ARROW_ASSIGN_OR_RAISE(auto decoded, SafeUriUnescape(segment)); RETURN_NOT_OK(InsertRepr(static_cast(field_index++), decoded)); @@ -570,15 +574,20 @@ Result> HivePartitioning::ParseKey( return util::nullopt; } + // Static method, so we have no better place for it + util::InitializeUTF8(); + auto name = segment.substr(0, name_end); std::string value; switch (options.segment_encoding) { - case SegmentEncoding::None: + case SegmentEncoding::None: { value = segment.substr(name_end + 1); + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", value); + } break; + } case SegmentEncoding::Uri: { - // Static method, so we have no better place for it - util::InitializeUTF8(); auto raw_value = util::string_view(segment).substr(name_end + 1); ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); break; diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 6d431c22dfc..d8e5198f21d 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -646,6 +646,13 @@ TEST_F(TestPartitioning, UrlEncodedHive) { and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), equal(field_ref("str"), literal("%24"))})); + + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/date=\xAF/time=\xBF/str=\xCF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"})); } TEST_F(TestPartitioning, EtlThenHive) { From 6dff21fbcf6af8d4b10dc05557d29bc3bcf0093e Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 2 Jun 2021 11:38:38 -0500 Subject: [PATCH 09/10] ARROW-12644: [C++][Dataset] Fix MSVC build error --- cpp/src/arrow/dataset/partition.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 024dd5d1aeb..db3008f1d67 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -98,7 +98,7 @@ enum class SegmentEncoding : int8_t { Uri = 1, }; -ARROW_EXPORT +ARROW_DS_EXPORT std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding); /// \brief Options for key-value based partitioning (hive/directory). From 6147fb54d94f1d2aab4d8d6fd48cd25031c8df7b Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 3 Jun 2021 10:01:02 -0500 Subject: [PATCH 10/10] ARROW-12644: [Python][Dataset] Fix MSVC/Cython build error --- python/pyarrow/_dataset.pyx | 6 +++--- python/pyarrow/includes/libarrow_dataset.pxd | 18 +++++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 6b2289e087d..78620b25942 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -85,11 +85,11 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): return c_source -cdef SegmentEncoding _get_segment_encoding(str segment_encoding): +cdef CSegmentEncoding _get_segment_encoding(str segment_encoding): if segment_encoding == "none": - return SegmentEncoding_NONE + return CSegmentEncodingNone elif segment_encoding == "uri": - return SegmentEncoding_URI + return CSegmentEncodingUri raise ValueError(f"Unknown segment encoding: {segment_encoding}") diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 4883691ee51..8cab5536647 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -305,31 +305,35 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CExpression] Parse(const c_string & path) const const shared_ptr[CSchema] & schema() - enum SegmentEncoding" arrow::dataset::SegmentEncoding": - SegmentEncoding_NONE" arrow::dataset::SegmentEncoding::None" - SegmentEncoding_URI" arrow::dataset::SegmentEncoding::Uri" + cdef cppclass CSegmentEncoding" arrow::dataset::SegmentEncoding": + pass + + CSegmentEncoding CSegmentEncodingNone\ + " arrow::dataset::SegmentEncoding::None" + CSegmentEncoding CSegmentEncodingUri\ + " arrow::dataset::SegmentEncoding::Uri" cdef cppclass CKeyValuePartitioningOptions \ "arrow::dataset::KeyValuePartitioningOptions": - SegmentEncoding segment_encoding + CSegmentEncoding segment_encoding cdef cppclass CHivePartitioningOptions \ "arrow::dataset::HivePartitioningOptions": - SegmentEncoding segment_encoding + CSegmentEncoding segment_encoding c_string null_fallback cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": c_bool infer_dictionary shared_ptr[CSchema] schema - SegmentEncoding segment_encoding + CSegmentEncoding segment_encoding cdef cppclass CHivePartitioningFactoryOptions \ "arrow::dataset::HivePartitioningFactoryOptions": c_bool infer_dictionary c_string null_fallback shared_ptr[CSchema] schema - SegmentEncoding segment_encoding + CSegmentEncoding segment_encoding cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass