diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 8db9d7c8427..db871a511b0 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -580,19 +580,22 @@ Result> HivePartitioning::ParseKey( // Static method, so we have no better place for it util::InitializeUTF8(); - auto name = segment.substr(0, name_end); + std::string name; std::string value; switch (options.segment_encoding) { case SegmentEncoding::None: { + name = segment.substr(0, name_end); value = segment.substr(name_end + 1); - if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) { - return Status::Invalid("Partition segment was not valid UTF-8: ", value); + if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) { + return Status::Invalid("Partition segment was not valid UTF-8: ", segment); } break; } case SegmentEncoding::Uri: { auto raw_value = util::string_view(segment).substr(name_end + 1); ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value)); + auto raw_key = util::string_view(segment).substr(0, name_end); + ARROW_ASSIGN_OR_RAISE(name, SafeUriUnescape(raw_key)); break; } default: diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index c4cd528c2b7..0897a4ed629 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -656,6 +656,53 @@ TEST_F(TestPartitioning, UrlEncodedHive) { partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"})); } +TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) { + HivePartitioningFactoryOptions options; + auto ts = timestamp(TimeUnit::type::SECOND); + options.schema = + schema({field("test'; date", ts), field("test'; time", ts), field("str", utf8())}); + options.null_fallback = "$"; + factory_ = HivePartitioning::MakeFactory(options); + + AssertInspect({"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " + "07:27:00/str=$", + "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " + "07:27:00/str=%E3%81%8F%E3%81%BE", + "/test%27%3B%20date=2021-05-04 " + "00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24"}, + options.schema->fields()); + + auto date = std::make_shared(1620086400, ts); + auto time = std::make_shared(1620113220, ts); + partitioning_ = std::make_shared(options.schema, ArrayVector(), + options.AsHivePartitioningOptions()); + AssertParse( + "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " + "07:27:00/str=$", + and_({equal(field_ref("test'; date"), literal(date)), + equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))})); + AssertParse( + "/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 " + "07:27:00/str=%E3%81%8F%E3%81%BE", + and_({equal(field_ref("test'; date"), literal(date)), + equal(field_ref("test'; time"), literal(time)), + equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))})); + // URL-encoded null fallback value + AssertParse( + "/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 " + "07%3A27%3A00/str=%24", + and_({equal(field_ref("test'; date"), literal(date)), + equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))})); + + // Invalid UTF-8 + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("was not valid UTF-8"), + factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"})); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, ::testing::HasSubstr("was not valid UTF-8"), + partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"})); +} + TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b993a3a6eee..f1a9f356088 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1580,6 +1580,85 @@ def test_partitioning_factory_segment_encoding(): inferred_schema = factory.inspect() +def test_partitioning_factory_hive_segment_encoding_key_encoded(): + mockfs = fs._MockFileSystem() + format = ds.IpcFileFormat() + schema = pa.schema([("i64", pa.int64())]) + table = pa.table([pa.array(range(10))], schema=schema) + partition_schema = pa.schema( + [("test'; date", pa.timestamp("s")), ("test';[ string'", pa.string())]) + string_partition_schema = pa.schema( + [("test'; date", pa.string()), ("test';[ string'", pa.string())]) + full_schema = pa.schema(list(schema) + list(partition_schema)) + + partition_schema_en = pa.schema( + [("test%27%3B%20date", pa.timestamp("s")), + ("test%27%3B%5B%20string%27", pa.string())]) + string_partition_schema_en = pa.schema( + [("test%27%3B%20date", pa.string()), + ("test%27%3B%5B%20string%27", pa.string())]) + + directory = ("hive/test%27%3B%20date=2021-05-04 00%3A00%3A00/" + "test%27%3B%5B%20string%27=%24") + mockfs.create_dir(directory) + with mockfs.open_output_stream(directory + "/0.feather") as sink: + with pa.ipc.new_file(sink, schema) as writer: + writer.write_table(table) + writer.close() + + # Hive + selector = fs.FileSelector("hive", recursive=True) + options = ds.FileSystemFactoryOptions("hive") + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema) + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + inferred_schema = factory.inspect() + assert inferred_schema == full_schema + actual = factory.finish().to_table(columns={ + "date_int": ds.field("test'; date").cast(pa.int64()), + }) + assert actual[0][0].as_py() == 1620086400 + + options.partitioning_factory = ds.HivePartitioning.discover( + segment_encoding="uri") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("test'; date") == "2021-05-04 00:00:00") & + (ds.field("test';[ string'") == "$")) + + options.partitioning = ds.HivePartitioning( + string_partition_schema, segment_encoding="uri") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("test'; date") == "2021-05-04 00:00:00") & + (ds.field("test';[ string'") == "$")) + + options.partitioning_factory = ds.HivePartitioning.discover( + segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") & + (ds.field("test%27%3B%5B%20string%27") == "%24")) + + options.partitioning = ds.HivePartitioning( + string_partition_schema_en, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + fragments = list(factory.finish().get_fragments()) + assert fragments[0].partition_expression.equals( + (ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") & + (ds.field("test%27%3B%5B%20string%27") == "%24")) + + options.partitioning_factory = ds.HivePartitioning.discover( + schema=partition_schema_en, segment_encoding="none") + factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) + with pytest.raises(pa.ArrowInvalid, + match="Could not cast segments for partition field"): + inferred_schema = factory.inspect() + + def test_dictionary_partitioning_outer_nulls_raises(tempdir): table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) part = ds.partitioning( diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 4403b479acb..8465e93bed5 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -215,6 +215,57 @@ test_that("URI-decoding with hive partitioning", { ) }) +test_that("URI-decoding with hive partitioning with key encoded", { + root <- make_temp_dir() + fmt <- FileFormat$create("feather") + fs <- LocalFileSystem$create() + selector <- FileSelector$create(root, recursive = TRUE) + dir1 <- file.path(root, "test%20key=2021-05-04 00%3A00%3A00", "test%20key1=%24") + dir.create(dir1, recursive = TRUE) + write_feather(df1, file.path(dir1, "data.feather")) + + partitioning <- hive_partition( + `test key` = timestamp(unit = "s"), `test key1` = utf8(), segment_encoding = "uri" + ) + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, + partitioning = partitioning + ) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_scan_result(ds, schm) + + # segment encoding for both key and values + partitioning_factory <- hive_partition(segment_encoding = "uri") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory + ) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(`test key` == "2021-05-04 00:00:00", `test key1` == "$") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) + + # no segment encoding + partitioning_factory <- hive_partition(segment_encoding = "none") + factory <- FileSystemDatasetFactory$create( + fs, selector, NULL, fmt, partitioning_factory + ) + schm <- factory$Inspect() + ds <- factory$Finish(schm) + expect_equal( + ds %>% + filter(`test%20key` == "2021-05-04 00%3A00%3A00", `test%20key1` == "%24") %>% + select(int) %>% + collect(), + df1 %>% select(int) %>% collect() + ) +}) + # Everything else below here is using parquet files skip_if_not_available("parquet")