Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,19 +580,22 @@ Result<util::optional<KeyValuePartitioning::Key>> HivePartitioning::ParseKey(
// Static method, so we have no better place for it
util::InitializeUTF8();

auto name = segment.substr(0, name_end);
std::string name;
std::string value;
switch (options.segment_encoding) {
case SegmentEncoding::None: {
name = segment.substr(0, name_end);
value = segment.substr(name_end + 1);
if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) {
return Status::Invalid("Partition segment was not valid UTF-8: ", value);
if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
}
break;
}
case SegmentEncoding::Uri: {
auto raw_value = util::string_view(segment).substr(name_end + 1);
ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value));
auto raw_key = util::string_view(segment).substr(0, name_end);
ARROW_ASSIGN_OR_RAISE(name, SafeUriUnescape(raw_key));
break;
}
default:
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/arrow/dataset/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,53 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"}));
}

TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
HivePartitioningFactoryOptions options;
auto ts = timestamp(TimeUnit::type::SECOND);
options.schema =
schema({field("test'; date", ts), field("test'; time", ts), field("str", utf8())});
options.null_fallback = "$";
factory_ = HivePartitioning::MakeFactory(options);

AssertInspect({"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
"/test%27%3B%20date=2021-05-04 "
"00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24"},
options.schema->fields());

auto date = std::make_shared<TimestampScalar>(1620086400, ts);
auto time = std::make_shared<TimestampScalar>(1620113220, ts);
partitioning_ = std::make_shared<HivePartitioning>(options.schema, ArrayVector(),
options.AsHivePartitioningOptions());
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)),
equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
// URL-encoded null fallback value
AssertParse(
"/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 "
"07%3A27%3A00/str=%24",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));

// Invalid UTF-8
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"}));
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"}));
}

TEST_F(TestPartitioning, EtlThenHive) {
FieldVector etl_fields{field("year", int16()), field("month", int8()),
field("day", int8()), field("hour", int8())};
Expand Down
79 changes: 79 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,85 @@ def test_partitioning_factory_segment_encoding():
inferred_schema = factory.inspect()


def test_partitioning_factory_hive_segment_encoding_key_encoded():
mockfs = fs._MockFileSystem()
format = ds.IpcFileFormat()
schema = pa.schema([("i64", pa.int64())])
table = pa.table([pa.array(range(10))], schema=schema)
partition_schema = pa.schema(
[("test'; date", pa.timestamp("s")), ("test';[ string'", pa.string())])
string_partition_schema = pa.schema(
[("test'; date", pa.string()), ("test';[ string'", pa.string())])
full_schema = pa.schema(list(schema) + list(partition_schema))

partition_schema_en = pa.schema(
[("test%27%3B%20date", pa.timestamp("s")),
("test%27%3B%5B%20string%27", pa.string())])
string_partition_schema_en = pa.schema(
[("test%27%3B%20date", pa.string()),
("test%27%3B%5B%20string%27", pa.string())])

directory = ("hive/test%27%3B%20date=2021-05-04 00%3A00%3A00/"
"test%27%3B%5B%20string%27=%24")
mockfs.create_dir(directory)
with mockfs.open_output_stream(directory + "/0.feather") as sink:
with pa.ipc.new_file(sink, schema) as writer:
writer.write_table(table)
writer.close()

# Hive
selector = fs.FileSelector("hive", recursive=True)
options = ds.FileSystemFactoryOptions("hive")
options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema)
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
inferred_schema = factory.inspect()
assert inferred_schema == full_schema
actual = factory.finish().to_table(columns={
"date_int": ds.field("test'; date").cast(pa.int64()),
})
assert actual[0][0].as_py() == 1620086400

options.partitioning_factory = ds.HivePartitioning.discover(
segment_encoding="uri")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test'; date") == "2021-05-04 00:00:00") &
(ds.field("test';[ string'") == "$"))

options.partitioning = ds.HivePartitioning(
string_partition_schema, segment_encoding="uri")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test'; date") == "2021-05-04 00:00:00") &
(ds.field("test';[ string'") == "$"))

options.partitioning_factory = ds.HivePartitioning.discover(
segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") &
(ds.field("test%27%3B%5B%20string%27") == "%24"))

options.partitioning = ds.HivePartitioning(
string_partition_schema_en, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") &
(ds.field("test%27%3B%5B%20string%27") == "%24"))

options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema_en, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
with pytest.raises(pa.ArrowInvalid,
match="Could not cast segments for partition field"):
inferred_schema = factory.inspect()


def test_dictionary_partitioning_outer_nulls_raises(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']})
part = ds.partitioning(
Expand Down
51 changes: 51 additions & 0 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,57 @@ test_that("URI-decoding with hive partitioning", {
)
})

test_that("URI-decoding with hive partitioning with key encoded", {
root <- make_temp_dir()
fmt <- FileFormat$create("feather")
fs <- LocalFileSystem$create()
selector <- FileSelector$create(root, recursive = TRUE)
dir1 <- file.path(root, "test%20key=2021-05-04 00%3A00%3A00", "test%20key1=%24")
dir.create(dir1, recursive = TRUE)
write_feather(df1, file.path(dir1, "data.feather"))

partitioning <- hive_partition(
`test key` = timestamp(unit = "s"), `test key1` = utf8(), segment_encoding = "uri"
)
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt,
partitioning = partitioning
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_scan_result(ds, schm)

# segment encoding for both key and values
partitioning_factory <- hive_partition(segment_encoding = "uri")
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`test key` == "2021-05-04 00:00:00", `test key1` == "$") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)

# no segment encoding
partitioning_factory <- hive_partition(segment_encoding = "none")
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`test%20key` == "2021-05-04 00%3A00%3A00", `test%20key1` == "%24") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)
})

# Everything else below here is using parquet files
skip_if_not_available("parquet")

Expand Down