diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index f8382aa4052..25fa7ff2b70 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -279,7 +279,7 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions std::vector> fragments; for (const auto& info : files_) { - auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir); + auto fixed_path = StripPrefix(info.path(), options_.partition_base_dir); ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path)); ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition)); fragments.push_back(fragment); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 10277810575..2e05706bbbc 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -270,11 +270,11 @@ Future<> FileWriter::Finish() { namespace { -Status WriteBatch(std::shared_ptr batch, compute::Expression guarantee, - FileSystemDatasetWriteOptions write_options, - std::function, - const Partitioning::PartitionPathFormat&)> - write) { +Status WriteBatch( + std::shared_ptr batch, compute::Expression guarantee, + FileSystemDatasetWriteOptions write_options, + std::function, const PartitionPathFormat&)> + write) { ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); batch.reset(); // drop to hopefully conserve memory @@ -292,7 +292,7 @@ Status WriteBatch(std::shared_ptr batch, compute::Expression guaran for (std::size_t index = 0; index < groups.batches.size(); index++) { auto partition_expression = and_(groups.expressions[index], guarantee); auto next_batch = groups.batches[index]; - Partitioning::PartitionPathFormat destination; + PartitionPathFormat destination; ARROW_ASSIGN_OR_RAISE(destination, write_options.partitioning->Format(partition_expression)); RETURN_NOT_OK(write(next_batch, destination)); @@ -337,10 +337,10 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { return WriteBatch( batch, guarantee, write_options_, [this](std::shared_ptr next_batch, - const Partitioning::PartitionPathFormat& destination) { + const PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); + next_batch, destination.directory, destination.filename); if (!has_room.is_finished()) { // We don't have to worry about sequencing backpressure here since // task_group_ serves as our sequencer. If batches continue to arrive after @@ -481,11 +481,11 @@ class TeeNode : public compute::MapNode { compute::Expression guarantee) { return WriteBatch(batch, guarantee, write_options_, [this](std::shared_ptr next_batch, - const Partitioning::PartitionPathFormat& destination) { + const PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { util::tracing::Span span; Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); + next_batch, destination.directory, destination.filename); if (!has_room.is_finished()) { this->Pause(); return has_room.Then([this] { this->Resume(); }); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 9f914b57f01..f2a3903208f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -894,7 +894,7 @@ ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning) auto row_groups = Iota(metadata_subset->num_row_groups()); auto partition_expression = - partitioning.Parse(StripPrefixAndFilename(path, options_.partition_base_dir)) + partitioning.Parse(StripPrefix(path, options_.partition_base_dir)) .ValueOr(compute::literal(true)); ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 763ea0bc6ce..6b4d601db01 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -263,7 +263,7 @@ Result KeyValuePartitioning::Parse(const std::string& path) return and_(std::move(expressions)); } -Result KeyValuePartitioning::Format( +Result KeyValuePartitioning::Format( const compute::Expression& expr) const { ScalarVector values{static_cast(schema_->num_fields()), nullptr}; @@ -377,7 +377,8 @@ DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr schema, Result> DirectoryPartitioning::ParseKeys( const std::string& path) const { - std::vector segments = fs::internal::SplitAbstractPath(path); + std::vector segments = + fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first); return ParsePartitionSegments(segments); } @@ -390,23 +391,24 @@ FilenamePartitioning::FilenamePartitioning(std::shared_ptr schema, Result> FilenamePartitioning::ParseKeys( const std::string& path) const { - std::vector segments = - fs::internal::SplitAbstractPath(StripNonPrefix(path), kFilenamePartitionSep); + std::vector segments = fs::internal::SplitAbstractPath( + StripNonPrefix(fs::internal::GetAbstractPathParent(path).second), + kFilenamePartitionSep); return ParsePartitionSegments(segments); } -Result DirectoryPartitioning::FormatValues( +Result DirectoryPartitioning::FormatValues( const ScalarVector& values) const { std::vector segments; ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values)); return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""}; } -Result FilenamePartitioning::FormatValues( +Result FilenamePartitioning::FormatValues( const ScalarVector& values) const { std::vector segments; ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values)); - return Partitioning::PartitionPathFormat{ + return PartitionPathFormat{ "", fs::internal::JoinAbstractPath(std::move(segments), kFilenamePartitionSep) + kFilenamePartitionSep}; } @@ -721,7 +723,8 @@ Result> HivePartitioning::ParseKeys( const std::string& path) const { std::vector keys; - for (const auto& segment : fs::internal::SplitAbstractPath(path)) { + for (const auto& segment : + fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first)) { ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_)); if (auto key = maybe_key) { keys.push_back(std::move(*key)); @@ -731,7 +734,7 @@ Result> HivePartitioning::ParseKeys( return keys; } -Result HivePartitioning::FormatValues( +Result HivePartitioning::FormatValues( const ScalarVector& values) const { std::vector segments(static_cast(schema_->num_fields())); @@ -749,8 +752,7 @@ Result HivePartitioning::FormatValues( } } - return Partitioning::PartitionPathFormat{ - fs::internal::JoinAbstractPath(std::move(segments)), ""}; + return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""}; } class HivePartitioningFactory : public KeyValuePartitioningFactory { @@ -805,9 +807,14 @@ std::shared_ptr HivePartitioning::MakeFactory( return std::shared_ptr(new HivePartitioningFactory(options)); } -std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) { +std::string StripPrefix(const std::string& path, const std::string& prefix) { auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path); auto base_less = maybe_base_less ? std::string(*maybe_base_less) : path; + return base_less; +} + +std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) { + auto base_less = StripPrefix(path, prefix); auto basename_filename = fs::internal::GetAbstractPathParent(base_less); return basename_filename.first; } @@ -837,7 +844,6 @@ Result> PartitioningOrFactory::GetOrInferSchema( if (auto part = partitioning()) { return part->schema(); } - return factory()->Inspect(paths); } diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 6a679f34822..cd225f13787 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -38,6 +38,10 @@ namespace dataset { constexpr char kFilenamePartitionSep = '_'; +struct ARROW_DS_EXPORT PartitionPathFormat { + std::string directory, filename; +}; + // ---------------------------------------------------------------------- // Partitioning @@ -78,10 +82,6 @@ class ARROW_DS_EXPORT Partitioning { /// \brief Parse a path into a partition expression virtual Result Parse(const std::string& path) const = 0; - struct PartitionPathFormat { - std::string directory, prefix; - }; - virtual Result Format(const compute::Expression& expr) const = 0; /// \brief A default Partitioning which always yields scalar(true) @@ -358,9 +358,13 @@ class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning { Result FormatValues(const ScalarVector& values) const override; }; -/// \brief Remove a prefix and the filename of a path. +ARROW_DS_EXPORT std::string StripPrefix(const std::string& path, + const std::string& prefix); + +/// \brief Extracts the directory and filename and removes the prefix of a path /// -/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> "year=2019"` +/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> +/// {"year=2019","c.txt"}` ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix); diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 87f6f89ccd9..86b8c4f0b9d 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -58,18 +58,20 @@ class TestPartitioning : public ::testing::Test { } void AssertFormat(compute::Expression expr, const std::string& expected_directory, - const std::string& expected_prefix = "") { + const std::string& expected_filename = "") { // formatted partition expressions are bound to the schema of the dataset being // written ASSERT_OK_AND_ASSIGN(auto formatted, partitioning_->Format(expr)); ASSERT_EQ(formatted.directory, expected_directory); - ASSERT_EQ(formatted.prefix, expected_prefix); + ASSERT_EQ(formatted.filename, expected_filename); + // if ((formatted.filename).empty()){ + // formatted.filename = "format.parquet"; + // } // ensure the formatted path round trips the relevant components of the partition // expression: roundtripped should be a subset of expr ASSERT_OK_AND_ASSIGN(compute::Expression roundtripped, - partitioning_->Parse(formatted.directory)); - + partitioning_->Parse(formatted.directory + formatted.filename)); ASSERT_OK_AND_ASSIGN(roundtripped, roundtripped.Bind(*written_schema_)); ASSERT_OK_AND_ASSIGN(auto simplified, SimplifyWithGuarantee(roundtripped, expr)); ASSERT_EQ(simplified, literal(true)); @@ -188,22 +190,38 @@ TEST_F(TestPartitioning, DirectoryPartitioning) { partitioning_ = std::make_shared( schema({field("alpha", int32()), field("beta", utf8())})); - AssertParse("/0/hello", and_(equal(field_ref("alpha"), literal(0)), - equal(field_ref("beta"), literal("hello")))); - AssertParse("/3", equal(field_ref("alpha"), literal(3))); - AssertParseError("/world/0"); // reversed order - AssertParseError("/0.0/foo"); // invalid alpha - AssertParseError("/3.25"); // invalid alpha with missing beta + AssertParse("/0/hello/", and_(equal(field_ref("alpha"), literal(0)), + equal(field_ref("beta"), literal("hello")))); + AssertParse("/3/", equal(field_ref("alpha"), literal(3))); + AssertParseError("/world/0/"); // reversed order + AssertParseError("/0.0/foo/"); // invalid alpha + AssertParseError("/3.25/"); // invalid alpha with missing beta AssertParse("", literal(true)); // no segments to parse // gotcha someday: - AssertParse("/0/dat.parquet", and_(equal(field_ref("alpha"), literal(0)), - equal(field_ref("beta"), literal("dat.parquet")))); + AssertParse("/0/dat.parquet/", and_(equal(field_ref("alpha"), literal(0)), + equal(field_ref("beta"), literal("dat.parquet")))); AssertParse("/0/foo/ignored=2341", and_(equal(field_ref("alpha"), literal(0)), equal(field_ref("beta"), literal("foo")))); } +TEST_F(TestPartitioning, FilenamePartitioning) { + partitioning_ = std::make_shared( + schema({field("alpha", int32()), field("beta", utf8())})); + + AssertParse("0_hello_", and_(equal(field_ref("alpha"), literal(0)), + equal(field_ref("beta"), literal("hello")))); + AssertParse("0_", equal(field_ref("alpha"), literal(0))); + AssertParseError("world_0_"); // reversed order + AssertParseError("0.0_foo_"); // invalid alpha + AssertParseError("3.25_"); // invalid alpha with missing beta + AssertParse("", literal(true)); // no segments to parse + + AssertParse("0_foo_ignored=2341", and_(equal(field_ref("alpha"), literal(0)), + equal(field_ref("beta"), literal("foo")))); +} + TEST_F(TestPartitioning, DirectoryPartitioningFormat) { partitioning_ = std::make_shared( schema({field("alpha", int32()), field("beta", utf8())})); @@ -267,7 +285,7 @@ TEST_F(TestPartitioning, DirectoryPartitioningWithTemporal) { schema({field("year", int32()), field("month", int8()), field("day", temporal)})); ASSERT_OK_AND_ASSIGN(auto day, StringScalar("2020-06-08").CastTo(temporal)); - AssertParse("/2020/06/2020-06-08", + AssertParse("/2020/06/2020-06-08/", and_({equal(field_ref("year"), literal(2020)), equal(field_ref("month"), literal(6)), equal(field_ref("day"), literal(day))})); @@ -368,11 +386,11 @@ TEST_F(TestPartitioning, DictionaryHasUniqueValues) { auto dictionary_scalar = std::make_shared(index_and_dictionary, alpha->type()); - auto path = "/" + expected_dictionary->GetString(i); + auto path = "/" + expected_dictionary->GetString(i) + "/"; AssertParse(path, equal(field_ref("alpha"), literal(dictionary_scalar))); } - AssertParseError("/yosemite"); // not in inspected dictionary + AssertParseError("/yosemite/"); // not in inspected dictionary } TEST_F(TestPartitioning, DiscoverSchemaSegfault) { @@ -385,27 +403,27 @@ TEST_F(TestPartitioning, HivePartitioning) { partitioning_ = std::make_shared( schema({field("alpha", int32()), field("beta", float32())}), ArrayVector(), "xyz"); - AssertParse("/alpha=0/beta=3.25", and_(equal(field_ref("alpha"), literal(0)), - equal(field_ref("beta"), literal(3.25f)))); - AssertParse("/beta=3.25/alpha=0", and_(equal(field_ref("beta"), literal(3.25f)), - equal(field_ref("alpha"), literal(0)))); - AssertParse("/alpha=0", equal(field_ref("alpha"), literal(0))); - AssertParse("/alpha=xyz/beta=3.25", and_(is_null(field_ref("alpha")), - equal(field_ref("beta"), literal(3.25f)))); - AssertParse("/beta=3.25", equal(field_ref("beta"), literal(3.25f))); + AssertParse("/alpha=0/beta=3.25/", and_(equal(field_ref("alpha"), literal(0)), + equal(field_ref("beta"), literal(3.25f)))); + AssertParse("/beta=3.25/alpha=0/", and_(equal(field_ref("beta"), literal(3.25f)), + equal(field_ref("alpha"), literal(0)))); + AssertParse("/alpha=0/", equal(field_ref("alpha"), literal(0))); + AssertParse("/alpha=xyz/beta=3.25/", and_(is_null(field_ref("alpha")), + equal(field_ref("beta"), literal(3.25f)))); + AssertParse("/beta=3.25/", equal(field_ref("beta"), literal(3.25f))); AssertParse("", literal(true)); - AssertParse("/alpha=0/unexpected/beta=3.25", + AssertParse("/alpha=0/beta=3.25/ignored=2341/", and_(equal(field_ref("alpha"), literal(0)), equal(field_ref("beta"), literal(3.25f)))); - AssertParse("/alpha=0/beta=3.25/ignored=2341", + AssertParse("/alpha=0/beta=3.25/ignored=2341/", and_(equal(field_ref("alpha"), literal(0)), equal(field_ref("beta"), literal(3.25f)))); - AssertParse("/ignored=2341", literal(true)); + AssertParse("/ignored=2341/", literal(true)); - AssertParseError("/alpha=0.0/beta=3.25"); // conversion of "0.0" to int32 fails + AssertParseError("/alpha=0.0/beta=3.25/"); // conversion of "0.0" to int32 fails } TEST_F(TestPartitioning, HivePartitioningFormat) { @@ -542,11 +560,11 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) { auto dictionary_scalar = std::make_shared(index_and_dictionary, alpha->type()); - auto path = "/alpha=" + expected_dictionary->GetString(i); + auto path = "/alpha=" + expected_dictionary->GetString(i) + "/"; AssertParse(path, equal(field_ref("alpha"), literal(dictionary_scalar))); } - AssertParseError("/alpha=yosemite"); // not in inspected dictionary + AssertParseError("/alpha=yosemite/"); // not in inspected dictionary } TEST_F(TestPartitioning, ExistingSchemaDirectory) { @@ -647,7 +665,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) { auto date = std::make_shared(1620086400, ts); auto time = std::make_shared(1620113220, ts); partitioning_ = std::make_shared(options.schema, ArrayVector()); - AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24/", and_({equal(field_ref("date"), literal(date)), equal(field_ref("time"), literal(time)), equal(field_ref("str"), literal("$"))})); @@ -656,7 +674,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) { EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), factory_->Inspect({"/%AF/%BF/%CF"})); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), - partitioning_->Parse({"/%AF/%BF/%CF"})); + partitioning_->Parse("/%AF/%BF/%CF")); options.segment_encoding = SegmentEncoding::None; options.schema = @@ -667,7 +685,7 @@ TEST_F(TestPartitioning, UrlEncodedDirectory) { options.schema->fields()); partitioning_ = std::make_shared( options.schema, ArrayVector(), options.AsPartitioningOptions()); - AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24", + AssertParse("/2021-05-04 00%3A00%3A00/2021-05-04 07%3A27%3A00/%24/", and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), equal(field_ref("str"), literal("%24"))})); @@ -690,15 +708,16 @@ TEST_F(TestPartitioning, UrlEncodedHive) { auto time = std::make_shared(1620113220, ts); partitioning_ = std::make_shared(options.schema, ArrayVector(), options.AsHivePartitioningOptions()); - AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$", + AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=$/", and_({equal(field_ref("date"), literal(date)), equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); - AssertParse("/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE", - and_({equal(field_ref("date"), literal(date)), - equal(field_ref("time"), literal(time)), - equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); + AssertParse( + "/date=2021-05-04 00:00:00/time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE/", + and_({equal(field_ref("date"), literal(date)), + equal(field_ref("time"), literal(time)), + equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); // URL-encoded null fallback value - AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24/", and_({equal(field_ref("date"), literal(date)), equal(field_ref("time"), literal(time)), is_null(field_ref("str"))})); @@ -706,7 +725,7 @@ TEST_F(TestPartitioning, UrlEncodedHive) { EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), factory_->Inspect({"/date=%AF/time=%BF/str=%CF"})); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), - partitioning_->Parse({"/date=%AF/time=%BF/str=%CF"})); + partitioning_->Parse("/date=%AF/time=%BF/str=%CF")); options.segment_encoding = SegmentEncoding::None; options.schema = @@ -719,7 +738,7 @@ TEST_F(TestPartitioning, UrlEncodedHive) { options.schema->fields()); partitioning_ = std::make_shared(options.schema, ArrayVector(), options.AsHivePartitioningOptions()); - AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24", + AssertParse("/date=2021-05-04 00%3A00%3A00/time=2021-05-04 07%3A27%3A00/str=%24/", and_({equal(field_ref("date"), literal("2021-05-04 00%3A00%3A00")), equal(field_ref("time"), literal("2021-05-04 07%3A27%3A00")), equal(field_ref("str"), literal("%24"))})); @@ -727,9 +746,8 @@ TEST_F(TestPartitioning, UrlEncodedHive) { // Invalid UTF-8 EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), factory_->Inspect({"/date=\xAF/time=\xBF/str=\xCF"})); - EXPECT_RAISES_WITH_MESSAGE_THAT( - Invalid, ::testing::HasSubstr("was not valid UTF-8"), - partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"})); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse("/date=\xAF/time=\xBF/str=\xCF")); } TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) { @@ -754,19 +772,19 @@ TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) { options.AsHivePartitioningOptions()); AssertParse( "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " - "07:27:00/str=$", + "07:27:00/str=$/", and_({equal(field_ref("test'; date"), literal(date)), equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))})); AssertParse( "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " - "07:27:00/str=%E3%81%8F%E3%81%BE", + "07:27:00/str=%E3%81%8F%E3%81%BE/", and_({equal(field_ref("test'; date"), literal(date)), equal(field_ref("test'; time"), literal(time)), equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); // URL-encoded null fallback value AssertParse( "/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 " - "07%3A27%3A00/str=%24", + "07%3A27%3A00/str=%24/", and_({equal(field_ref("test'; date"), literal(date)), equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))})); @@ -776,7 +794,7 @@ TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) { factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"})); EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("was not valid UTF-8"), - partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"})); + partitioning_->Parse("/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24/")); } TEST_F(TestPartitioning, EtlThenHive) { @@ -801,17 +819,17 @@ TEST_F(TestPartitioning, EtlThenHive) { auto etl_segments_end = segments.begin() + etl_fields.size(); auto etl_path = fs::internal::JoinAbstractPath(segments.begin(), etl_segments_end); - ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path)); + ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path + "/")); auto alphabeta_segments_end = etl_segments_end + alphabeta_fields.size(); auto alphabeta_path = fs::internal::JoinAbstractPath(etl_segments_end, alphabeta_segments_end); - ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr, alphabeta_part.Parse(alphabeta_path)); - + ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr, + alphabeta_part.Parse(alphabeta_path + "/")); return and_(etl_expr, alphabeta_expr); }); - AssertParse("/1999/12/31/00/alpha=0/beta=3.25", + AssertParse("/1999/12/31/00/alpha=0/beta=3.25/", and_({equal(field_ref("year"), literal(1999)), equal(field_ref("month"), literal(12)), equal(field_ref("day"), literal(31)), @@ -819,7 +837,7 @@ TEST_F(TestPartitioning, EtlThenHive) { and_(equal(field_ref("alpha"), literal(0)), equal(field_ref("beta"), literal(3.25f)))})); - AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25"); + AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25/"); } TEST_F(TestPartitioning, Set) { @@ -919,9 +937,8 @@ class RangePartitioning : public Partitioning { return Status::OK(); } - Result Format( - const compute::Expression&) const override { - return Partitioning::PartitionPathFormat{"", ""}; + Result Format(const compute::Expression&) const override { + return PartitionPathFormat{"", ""}; } Result Partition( const std::shared_ptr&) const override { @@ -953,9 +970,9 @@ TEST(TestStripPrefixAndFilename, Basic) { std::vector input{"/data/year=2019/file.parquet", "/data/year=2019/month=12/file.parquet", "/data/year=2019/month=12/day=01/file.parquet"}; - EXPECT_THAT(StripPrefixAndFilename(input, "/data"), - testing::ElementsAre("year=2019", "year=2019/month=12", - "year=2019/month=12/day=01")); + auto paths = StripPrefixAndFilename(input, "/data"); + EXPECT_THAT(paths, testing::ElementsAre("year=2019", "year=2019/month=12", + "year=2019/month=12/day=01")); } } // namespace dataset diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 83ff069ecff..04484dd5bda 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -34,12 +34,12 @@ namespace internal { std::vector SplitAbstractPath(const std::string& path, char sep) { std::vector parts; auto v = util::string_view(path); - // Strip trailing slash - if (v.length() > 0 && v.back() == kSep) { + // Strip trailing separator + if (v.length() > 0 && v.back() == sep) { v = v.substr(0, v.length() - 1); } - // Strip leading slash - if (v.length() > 0 && v.front() == kSep) { + // Strip leading separator + if (v.length() > 0 && v.front() == sep) { v = v.substr(1); } if (v.length() == 0) { diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b22a3d032f0..0498dc7ab6f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -218,7 +218,6 @@ def partitioning(schema=None, field_names=None, flavor=None, "For the default directory flavor, need to specify " "a Schema or a list of field names") if flavor == "filename": - # default flavor if schema is not None: if field_names is not None: raise ValueError( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3bc905f2f97..288301fd4b4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -582,7 +582,7 @@ def test_partitioning(): ) assert len(partitioning.dictionaries) == 2 assert all(x is None for x in partitioning.dictionaries) - expr = partitioning.parse('/3/3.14') + expr = partitioning.parse('/3/3.14/') assert isinstance(expr, ds.Expression) expected = (ds.field('group') == 3) & (ds.field('key') == 3.14) @@ -591,7 +591,7 @@ def test_partitioning(): with pytest.raises(pa.ArrowInvalid): partitioning.parse('/prefix/3/aaa') - expr = partitioning.parse('/3') + expr = partitioning.parse('/3/') expected = ds.field('group') == 3 assert expr.equals(expected) @@ -604,20 +604,20 @@ def test_partitioning(): ) assert len(partitioning.dictionaries) == 2 assert all(x is None for x in partitioning.dictionaries) - expr = partitioning.parse('/alpha=0/beta=3') + expr = partitioning.parse('/alpha=0/beta=3/') expected = ( (ds.field('alpha') == ds.scalar(0)) & (ds.field('beta') == ds.scalar(3)) ) assert expr.equals(expected) - expr = partitioning.parse('/alpha=xyz/beta=3') + expr = partitioning.parse('/alpha=xyz/beta=3/') expected = ( (ds.field('alpha').is_null() & (ds.field('beta') == ds.scalar(3))) ) assert expr.equals(expected) - for shouldfail in ['/alpha=one/beta=2', '/alpha=one', '/beta=two']: + for shouldfail in ['/alpha=one/beta=2/', '/alpha=one/', '/beta=two/']: with pytest.raises(pa.ArrowInvalid): partitioning.parse(shouldfail) @@ -662,6 +662,24 @@ def test_partitioning(): assert partitioning.dictionaries[1].to_pylist() == [ "first", "second", "third"] + # test partitioning roundtrip + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10))], + names=["f1", "f2", "part"] + ) + partitioning_schema = pa.schema([("part", pa.string())]) + for klass in [ds.DirectoryPartitioning, ds.HivePartitioning, + ds.FilenamePartitioning]: + with tempfile.TemporaryDirectory() as tempdir: + partitioning = klass(partitioning_schema) + ds.write_dataset(table, tempdir, + format='ipc', partitioning=partitioning) + load_back = ds.dataset(tempdir, format='ipc', + partitioning=partitioning) + load_back_table = load_back.to_table() + assert load_back_table.equals(table) + def test_expression_arithmetic_operators(): dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]}))