-
Notifications
You must be signed in to change notification settings - Fork 468
Expand file tree
/
Copy pathtest_locations.py
More file actions
191 lines (141 loc) · 8.01 KB
/
test_locations.py
File metadata and controls
191 lines (141 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
import pytest
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import NestedField, StringType
PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field")
PARTITION_KEY = PartitionKey(
field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")],
partition_spec=PartitionSpec(PARTITION_FIELD),
schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)),
)
class CustomLocationProvider(LocationProvider):
def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
return f"custom_location_provider/{data_file_name}"
def test_simple_location_provider_no_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"})
assert provider.new_data_location("my_file") == "table_location/data/my_file"
def test_simple_location_provider_with_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"})
assert provider.new_data_location("my_file", PARTITION_KEY) == "table_location/data/string_field=example_string/my_file"
def test_custom_location_provider() -> None:
qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__
provider = load_location_provider(
table_location="table_location", table_properties={"write.py-location-provider.impl": qualified_name}
)
assert provider.new_data_location("my_file") == "custom_location_provider/my_file"
def test_custom_location_provider_single_path() -> None:
with pytest.raises(ValueError, match=r"write\.py-location-provider\.impl should be full path"):
load_location_provider(table_location="table_location", table_properties={"write.py-location-provider.impl": "not_found"})
def test_custom_location_provider_not_found(caplog: Any) -> None:
import logging
caplog.set_level(logging.DEBUG)
with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"):
load_location_provider(
table_location="table_location", table_properties={"write.py-location-provider.impl": "module.not_found"}
)
assert "Could not initialize LocationProvider: module.not_found" in caplog.text
assert "ModuleNotFoundError: No module named 'module'" in caplog.text
def test_object_storage_no_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})
location = provider.new_data_location("test.parquet")
parts = location.split("/")
assert len(parts) == 7
assert parts[0] == "table_location"
assert parts[1] == "data"
assert parts[-1] == "test.parquet"
# Entropy directories in the middle
for dir_name in parts[2:-1]:
assert dir_name
assert all(c in "01" for c in dir_name)
def test_object_storage_with_partition() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={"write.object-storage.enabled": "true"},
)
location = provider.new_data_location("test.parquet", PARTITION_KEY)
# Partition values AND entropy included in the path. Entropy differs to that in the test below because the partition
# key AND the data file name are used as the hash input. This matches Java behaviour; the hash below is what the
# Java implementation produces for this input too.
assert location == "table_location/data/0001/0010/1001/00000011/string_field=example_string/test.parquet"
# NB: We test here with None partition key too because disabling partitioned paths still replaces final / with - even in
# paths of un-partitioned files. This matches the behaviour of the Java implementation.
@pytest.mark.parametrize("partition_key", [PARTITION_KEY, None])
def test_object_storage_partitioned_paths_disabled(partition_key: PartitionKey | None) -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
"write.object-storage.enabled": "true",
"write.object-storage.partitioned-paths": "false",
},
)
location = provider.new_data_location("test.parquet", partition_key)
# No partition values included in the path and last part of entropy is separated with "-"
assert location == "table_location/data/0110/1010/0011/11101000-test.parquet"
@pytest.mark.parametrize(
["data_file_name", "expected_hash"],
[
("a", "0101/0110/1001/10110010"),
("b", "1110/0111/1110/00000011"),
("c", "0010/1101/0110/01011111"),
("d", "1001/0001/0100/01110011"),
],
)
def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})
assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"
def test_object_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="s3://table-location/table",
table_properties={
"write.object-storage.enabled": "true",
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
},
)
assert (
provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/0010/1111/0101/11011101/file.parquet"
)
def test_simple_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
"write.object-storage.enabled": "false",
},
)
assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"
def test_location_provider_metadata_default_location() -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)
assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"
def test_location_provider_metadata_location_with_custom_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
)
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"
def test_metadata_location_with_trailing_slash() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
)
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"