Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import InputStream
from pyiceberg.typedef import UTF8


class BinaryDecoder(ABC):
Expand Down Expand Up @@ -107,7 +108,7 @@ def read_utf8(self) -> str:
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
"""
return self.read_bytes().decode("utf-8")
return self.read_bytes().decode(UTF8)

def skip_boolean(self) -> None:
self.skip(1)
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/avro/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import OutputStream
from pyiceberg.typedef import UTF8


class BinaryEncoder:
Expand Down Expand Up @@ -62,7 +63,7 @@ def write_bytes(self, b: bytes) -> None:

def write_utf8(self, s: str) -> None:
"""Encode a string as a long followed by that many bytes of UTF-8 encoded character data."""
self.write_bytes(s.encode("utf-8"))
self.write_bytes(s.encode(UTF8))

def write_uuid(self, uuid: UUID) -> None:
"""Write UUID as a fixed[16].
Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
TableMetadata,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel

ICEBERG_REST_SPEC_VERSION = "0.14.1"

Expand Down Expand Up @@ -110,7 +110,7 @@ class Endpoints:
SIGV4_REGION = "rest.signing-region"
SIGV4_SERVICE = "rest.signing-name"

NAMESPACE_SEPARATOR = b"\x1F".decode("UTF-8")
NAMESPACE_SEPARATOR = b"\x1F".decode(UTF8)


class TableResponse(IcebergBaseModel):
Expand Down Expand Up @@ -444,7 +444,7 @@ def create_table(
write_order=sort_order,
properties=properties,
)
serialized_json = request.model_dump_json().encode("utf-8")
serialized_json = request.model_dump_json().encode(UTF8)
response = self._session.post(
self.url(Endpoints.create_table, namespace=namespace_and_table["namespace"]),
data=serialized_json,
Expand Down Expand Up @@ -475,7 +475,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
name=namespace_and_table["table"],
metadata_location=metadata_location,
)
serialized_json = request.model_dump_json().encode("utf-8")
serialized_json = request.model_dump_json().encode(UTF8)
response = self._session.post(
self.url(Endpoints.register_table, namespace=namespace_and_table["namespace"]),
data=serialized_json,
Expand Down Expand Up @@ -552,7 +552,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=table_request.model_dump_json().encode("utf-8"),
data=table_request.model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
8 changes: 4 additions & 4 deletions pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
Union,
)

from pyiceberg.typedef import L
from pyiceberg.typedef import UTF8, L
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -143,7 +143,7 @@ def _(_: UUIDType, value_str: str) -> uuid.UUID:
@partition_to_py.register(BinaryType)
@handle_none
def _(_: PrimitiveType, value_str: str) -> bytes:
return bytes(value_str, "UTF-8")
return bytes(value_str, UTF8)


@partition_to_py.register(DecimalType)
Expand Down Expand Up @@ -223,7 +223,7 @@ def _(_: DoubleType, value: float) -> bytes:

@to_bytes.register(StringType)
def _(_: StringType, value: str) -> bytes:
return value.encode("UTF-8")
return value.encode(UTF8)


@to_bytes.register(UUIDType)
Expand Down Expand Up @@ -308,7 +308,7 @@ def _(_: DoubleType, b: bytes) -> float:

@from_bytes.register(StringType)
def _(_: StringType, b: bytes) -> str:
return bytes(b).decode("utf-8")
return bytes(b).decode(UTF8)


@from_bytes.register(BinaryType)
Expand Down
13 changes: 9 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,18 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
raise # pragma: no cover - If some other kind of OSError, raise the raw error


def schema_to_pyarrow(schema: Union[Schema, IcebergType]) -> pa.schema:
return visit(schema, _ConvertToArrowSchema())
def schema_to_pyarrow(schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT) -> pa.schema:
return visit(schema, _ConvertToArrowSchema(metadata))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the visit() behavior with an empty dict?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometime we use the visitor to convert types, then we don't need to set any metadata so then a default with an empty dict makes things easier and less verbose.



class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType], Singleton):
class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
_metadata: Dict[bytes, bytes]

def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT) -> None:
self._metadata = metadata

def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
return pa.schema(list(struct_result))
return pa.schema(list(struct_result), metadata=self._metadata)

def struct(self, _: StructType, field_results: List[pa.DataType]) -> pa.DataType:
return pa.struct(field_results)
Expand Down
7 changes: 4 additions & 3 deletions pyiceberg/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
from pyiceberg.typedef import UTF8

GZIP = "gzip"

Expand Down Expand Up @@ -76,7 +77,7 @@ class FromByteStream:

@staticmethod
def table_metadata(
byte_stream: InputStream, encoding: str = "utf-8", compression: Compressor = NOOP_COMPRESSOR
byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.

Expand All @@ -97,7 +98,7 @@ class FromInputFile:
"""A collection of methods that deserialize InputFiles into Iceberg objects."""

@staticmethod
def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> TableMetadata:
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
"""Create a TableMetadata instance from an input file.

Args:
Expand Down Expand Up @@ -126,6 +127,6 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite:
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
json_bytes = metadata.model_dump_json().encode("utf-8")
json_bytes = metadata.model_dump_json().encode(UTF8)
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)
2 changes: 2 additions & 0 deletions pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def update(self, *args: Any, **kwargs: Any) -> None:
raise AttributeError("FrozenDict does not support .update()")


UTF8 = 'utf-8'

EMPTY_DICT = FrozenDict()

K = TypeVar("K")
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import strictyaml

from pyiceberg.typedef import FrozenDict, RecursiveDict
from pyiceberg.typedef import UTF8, FrozenDict, RecursiveDict

PYICEBERG = "pyiceberg_"
DEFAULT = "default"
Expand Down Expand Up @@ -76,7 +76,7 @@ def _load_yaml(directory: Optional[str]) -> Optional[RecursiveDict]:
if directory:
path = os.path.join(directory, PYICEBERG_YML)
if os.path.isfile(path):
with open(path, encoding="utf-8") as f:
with open(path, encoding=UTF8) as f:
yml_str = f.read()
file_config = strictyaml.load(yml_str).data
file_config_lowercase = _lowercase_dictionary_keys(file_config)
Expand Down
3 changes: 2 additions & 1 deletion tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
SortOrder,
)
from pyiceberg.transforms import BucketTransform, IdentityTransform
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
BooleanType,
IntegerType,
Expand Down Expand Up @@ -253,7 +254,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase,
)
)

with open(metadata_location, encoding="utf-8") as f:
with open(metadata_location, encoding=UTF8) as f:
payload = f.read()

metadata = TableMetadataUtil.parse_raw(payload)
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1456,7 +1457,7 @@ async def read(*_: Any) -> bytes:
@property
def raw_headers(self) -> aiohttp.typedefs.RawHeaders:
# Return the headers encoded the way that aiobotocore expects them
return {k.encode("utf-8"): str(v).encode("utf-8") for k, v in self.response.headers.items()}.items()
return {k.encode(UTF8): str(v).encode(UTF8) for k, v in self.response.headers.items()}.items()


def patch_aiobotocore() -> None:
Expand Down
20 changes: 11 additions & 9 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
)
from pyiceberg.io import InputStream, OutputStream, load_file_io
from pyiceberg.io.pyarrow import (
ICEBERG_SCHEMA,
PyArrowFile,
PyArrowFileIO,
_ConvertToArrowSchema,
Expand All @@ -69,6 +70,7 @@
from pyiceberg.schema import Schema, make_compatible_name, visit
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -708,15 +710,15 @@ def _write_table_to_file(filepath: str, schema: pa.Schema, table: pa.Table) -> s

@pytest.fixture
def file_int(schema_int: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int), metadata={"iceberg.schema": schema_int.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: bytes(schema_int.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/a.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema)
)


@pytest.fixture
def file_int_str(schema_int_str: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int_str), metadata={"iceberg.schema": schema_int_str.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_int_str, metadata={ICEBERG_SCHEMA: bytes(schema_int_str.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/a.parquet",
pyarrow_schema,
Expand All @@ -726,23 +728,23 @@ def file_int_str(schema_int_str: Schema, tmpdir: str) -> str:

@pytest.fixture
def file_string(schema_str: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_str), metadata={"iceberg.schema": schema_str.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_str, metadata={ICEBERG_SCHEMA: bytes(schema_str.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/b.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array(["0", "1", "2"])], schema=pyarrow_schema)
)


@pytest.fixture
def file_long(schema_long: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_long), metadata={"iceberg.schema": schema_long.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_long, metadata={ICEBERG_SCHEMA: bytes(schema_long.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/c.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema)
)


@pytest.fixture
def file_struct(schema_struct: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_struct), metadata={"iceberg.schema": schema_struct.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_struct, metadata={ICEBERG_SCHEMA: bytes(schema_struct.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/d.parquet",
pyarrow_schema,
Expand All @@ -759,7 +761,7 @@ def file_struct(schema_struct: Schema, tmpdir: str) -> str:

@pytest.fixture
def file_list(schema_list: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_list), metadata={"iceberg.schema": schema_list.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_list, metadata={ICEBERG_SCHEMA: bytes(schema_list.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/e.parquet",
pyarrow_schema,
Expand All @@ -776,8 +778,8 @@ def file_list(schema_list: Schema, tmpdir: str) -> str:

@pytest.fixture
def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(
schema_to_pyarrow(schema_list_of_structs), metadata={"iceberg.schema": schema_list_of_structs.model_dump_json()}
pyarrow_schema = schema_to_pyarrow(
schema_list_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_list_of_structs.model_dump_json(), UTF8)}
)
return _write_table_to_file(
f"file:{tmpdir}/e.parquet",
Expand All @@ -795,7 +797,7 @@ def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str:

@pytest.fixture
def file_map(schema_map: Schema, tmpdir: str) -> str:
pyarrow_schema = pa.schema(schema_to_pyarrow(schema_map), metadata={"iceberg.schema": schema_map.model_dump_json()})
pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: bytes(schema_map.model_dump_json(), UTF8)})
return _write_table_to_file(
f"file:{tmpdir}/e.parquet",
pyarrow_schema,
Expand Down
3 changes: 2 additions & 1 deletion tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
BooleanType,
FloatType,
Expand Down Expand Up @@ -99,7 +100,7 @@ def test_from_dict_v2_parse_raw(example_table_metadata_v2: Dict[str, Any]) -> No

def test_from_byte_stream(example_table_metadata_v2: Dict[str, Any]) -> None:
"""Test generating a TableMetadata instance from a file-like byte stream"""
data = bytes(json.dumps(example_table_metadata_v2), encoding="utf-8")
data = bytes(json.dumps(example_table_metadata_v2), encoding=UTF8)
byte_stream = io.BytesIO(data)
FromByteStream.table_metadata(byte_stream=byte_stream)

Expand Down
5 changes: 3 additions & 2 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
YearTransform,
parse_transform,
)
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -187,7 +188,7 @@ def test_bucket_method(type_var: PrimitiveType) -> None:

def test_string_with_surrogate_pair() -> None:
string_with_surrogate_pair = "string with a surrogate pair: 💰"
as_bytes = bytes(string_with_surrogate_pair, "UTF-8")
as_bytes = bytes(string_with_surrogate_pair, UTF8)
bucket_transform = BucketTransform(100).transform(StringType(), bucket=False)
assert bucket_transform(string_with_surrogate_pair) == mmh3.hash(as_bytes)

Expand Down Expand Up @@ -392,7 +393,7 @@ def test_truncate_string(input_var: str, expected: str) -> None:
"type_var,value,expected_human_str,expected",
[
(BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"),
(BinaryType(), bytes("\u2603de", "utf-8"), "4piDZGU=", b"\xe2"),
(BinaryType(), bytes("\u2603de", UTF8), "4piDZGU=", b"\xe2"),
(DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")),
(IntegerType(), 123, "123", 123),
(LongType(), 123, "123", 123),
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pytest
from strictyaml import as_document

from pyiceberg.typedef import RecursiveDict
from pyiceberg.typedef import UTF8, RecursiveDict
from pyiceberg.utils.config import Config, _lowercase_dictionary_keys, merge_config

EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": "https://service.io/api"}
Expand All @@ -43,7 +43,7 @@ def test_from_environment_variables_uppercase() -> None:

def test_from_configuration_files(tmp_path_factory: pytest.TempPathFactory) -> None:
config_path = str(tmp_path_factory.mktemp("config"))
with open(f"{config_path}/.pyiceberg.yaml", "w", encoding="utf-8") as file:
with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file:
yaml_str = as_document({"catalog": {"production": {"uri": "https://service.io/api"}}}).as_yaml()
file.write(yaml_str)

Expand Down