From 704771c60cd5446688bf12b11fcfb2a283aaed46 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 16 Jun 2025 13:06:33 -0700 Subject: [PATCH 1/5] Add deprecation message for source-id --- pyiceberg/partitioning.py | 8 ++++++++ pyiceberg/table/sorting.py | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index df07f94342..ba4b3c5ded 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -59,6 +59,7 @@ UUIDType, ) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros +from pyiceberg.utils.deprecated import deprecation_message INITIAL_PARTITION_SPEC_ID = 0 PARTITION_FIELD_ID_START: int = 1000 @@ -101,6 +102,13 @@ def __init__( if name is not None: data["name"] = name + if data["source-id"]: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="source-id is not allowed for Iceberg v3. Please use source-ids instead.", + ) + super().__init__(**data) @model_validator(mode="before") diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index 244c8ba867..e3543f2abc 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -30,6 +30,7 @@ from pyiceberg.transforms import IdentityTransform, Transform, parse_transform from pyiceberg.typedef import IcebergBaseModel from pyiceberg.types import IcebergType +from pyiceberg.utils.deprecated import deprecation_message class SortDirection(Enum): @@ -85,6 +86,14 @@ def __init__( data["direction"] = direction if null_order is not None: data["null-order"] = null_order + + if data["source-id"]: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="source-id is not allowed for Iceberg v3. Please use source-ids instead.", + ) + super().__init__(**data) @model_validator(mode="before") From 93c1ff14e798d9fc5771df2a247a2dccb65a8e3e Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 17 Jun 2025 14:08:46 -0700 Subject: [PATCH 2/5] error messages added --- pyiceberg/partitioning.py | 13 +++++++------ pyiceberg/table/sorting.py | 12 ++++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index ba4b3c5ded..95a0e62981 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -87,6 +87,7 @@ class PartitionField(IcebergBaseModel): def __init__( self, + format_version: int, source_id: Optional[int] = None, field_id: Optional[int] = None, transform: Optional[Transform[Any, Any]] = None, @@ -102,12 +103,12 @@ def __init__( if name is not None: data["name"] = name - if data["source-id"]: - deprecation_message( - deprecated_in="0.10.0", - removed_in="0.11.0", - help_message="source-id is not allowed for Iceberg v3. Please use source-ids instead.", - ) + if format_version == 3 and data["source-id"]: + raise ValueError("source-id is not allowed on Iceberg v3") + + if format_version in [1,2] and data["source-ids"]: + raise ValueError("source-ids is not allowed on Iceberg v1 and v2") + super().__init__(**data) diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index e3543f2abc..13e4de5bd4 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -72,6 +72,7 @@ class SortField(IcebergBaseModel): def __init__( self, + format_version: int, source_id: Optional[int] = None, transform: Optional[Union[Transform[Any, Any], Callable[[IcebergType], Transform[Any, Any]]]] = None, direction: Optional[SortDirection] = None, @@ -87,12 +88,11 @@ def __init__( if null_order is not None: data["null-order"] = null_order - if data["source-id"]: - deprecation_message( - deprecated_in="0.10.0", - removed_in="0.11.0", - help_message="source-id is not allowed for Iceberg v3. Please use source-ids instead.", - ) + if format_version == 3 and data["source-id"]: + raise ValueError("source-id is not allowed on Iceberg v3") + + if format_version in [1,2] and data["source-ids"]: + raise ValueError("source-ids is not allowed on Iceberg v1 and v2") super().__init__(**data) From 4b0db8f03584b465325fbff96e6d8bef792811f1 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 17 Jun 2025 14:20:49 -0700 Subject: [PATCH 3/5] wip --- pyiceberg/partitioning.py | 15 ++++++++------- pyiceberg/table/sorting.py | 13 ++++++++----- tests/table/test_partitioning.py | 10 ++++++++-- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 95a0e62981..4cd49ea85f 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -87,7 +87,7 @@ class PartitionField(IcebergBaseModel): def __init__( self, - format_version: int, + format_version: int = 2, source_id: Optional[int] = None, field_id: Optional[int] = None, transform: Optional[Transform[Any, Any]] = None, @@ -103,12 +103,7 @@ def __init__( if name is not None: data["name"] = name - if format_version == 3 and data["source-id"]: - raise ValueError("source-id is not allowed on Iceberg v3") - - if format_version in [1,2] and data["source-ids"]: - raise ValueError("source-ids is not allowed on Iceberg v1 and v2") - + data["format-version"] = format_version super().__init__(**data) @@ -125,6 +120,12 @@ def map_source_ids_onto_source_id(cls, data: Any) -> Any: data["source-id"] = source_ids[0] return data + @model_validator(mode="after") + @classmethod + def check_source_ids_against_version(cls, data: Any) -> Any: + if data["format-version"] in [1,2] and data["source-ids"]: + raise ValueError("source-ids is not allowed on Iceberg v1 and v2") + def __str__(self) -> str: """Return the string representation of the PartitionField class.""" return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})" diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index 13e4de5bd4..4184e0f201 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -88,11 +88,7 @@ def __init__( if null_order is not None: data["null-order"] = null_order - if format_version == 3 and data["source-id"]: - raise ValueError("source-id is not allowed on Iceberg v3") - - if format_version in [1,2] and data["source-ids"]: - raise ValueError("source-ids is not allowed on Iceberg v1 and v2") + data["format-version"] = format_version super().__init__(**data) @@ -103,6 +99,13 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]: values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST return values + @model_validator(mode="after") + @classmethod + def check_source_ids_against_version(cls, data: Any) -> Any: + if data["format-version"] in [1,2] and data["source-ids"]: + raise ValueError("source-ids is not allowed on Iceberg v1 and v2") + + @model_validator(mode="before") @classmethod def map_source_ids_onto_source_id(cls, data: Any) -> Any: diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index 0fe22391c0..e5ccd0e9a1 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -215,11 +215,17 @@ def test_deserialize_partition_field_v2() -> None: json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" field = PartitionField.model_validate_json(json_partition_spec) - assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + assert field == PartitionField(format_version=2, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") def test_deserialize_partition_field_v3() -> None: json_partition_spec = """{"source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" field = PartitionField.model_validate_json(json_partition_spec) - assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + assert field == PartitionField(format_version=3, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + +def test_v3_does_not_allow_source_id() -> None: + json_partition_spec = """{"format-version": 3, "source-id": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + + with pytest.raises(ValueError, match=r"source-id is not allowed"): + PartitionField.model_validate_json(json_partition_spec) \ No newline at end of file From 3f758eb6b241eedd665a1562b975c7b911d8cc06 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 17 Jun 2025 15:49:18 -0700 Subject: [PATCH 4/5] plumbing done, time to fix the tests --- pyiceberg/catalog/rest/__init__.py | 3 ++- pyiceberg/partitioning.py | 12 +++++++----- pyiceberg/table/metadata.py | 2 +- pyiceberg/table/sorting.py | 15 ++++++++------- pyiceberg/table/update/spec.py | 20 ++++++++++++++------ tests/table/test_partitioning.py | 8 ++++---- tests/table/test_sorting.py | 5 +++++ 7 files changed, 41 insertions(+), 24 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3f59a196ea..718f2442ff 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -480,10 +480,11 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: + format_version = self.properties.get(ICEBERG_REST_SPEC_VERSION) iceberg_schema = self._convert_schema_if_needed(schema) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) - fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) + fresh_sort_order = assign_fresh_sort_order_ids(format_version, sort_order, iceberg_schema, fresh_schema) namespace_and_table = self._split_identifier_for_path(identifier) if location: diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 4cd49ea85f..94ea8613ac 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -87,7 +87,7 @@ class PartitionField(IcebergBaseModel): def __init__( self, - format_version: int = 2, + format_version: Optional[int] = None, source_id: Optional[int] = None, field_id: Optional[int] = None, transform: Optional[Transform[Any, Any]] = None, @@ -102,8 +102,8 @@ def __init__( data["transform"] = transform if name is not None: data["name"] = name - - data["format-version"] = format_version + if format_version is not None: + data["format-version"] = format_version super().__init__(**data) @@ -120,12 +120,14 @@ def map_source_ids_onto_source_id(cls, data: Any) -> Any: data["source-id"] = source_ids[0] return data - @model_validator(mode="after") + @model_validator(mode="before") @classmethod def check_source_ids_against_version(cls, data: Any) -> Any: - if data["format-version"] in [1,2] and data["source-ids"]: + if "format-version" in data and data["format-version"] in [1,2] and "source-ids" in data: raise ValueError("source-ids is not allowed on Iceberg v1 and v2") + return data + def __str__(self) -> str: """Return the string representation of the PartitionField class.""" return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})" diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index f248700c02..f36dc3f9ca 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -578,7 +578,7 @@ def new_table_metadata( fresh_schema = assign_fresh_schema_ids(schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema) - fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema) + fresh_sort_order = assign_fresh_sort_order_ids(format_version, sort_order, schema, fresh_schema) if table_uuid is None: table_uuid = uuid.uuid4() diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index 4184e0f201..e75d203678 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -28,7 +28,7 @@ from pyiceberg.schema import Schema from pyiceberg.transforms import IdentityTransform, Transform, parse_transform -from pyiceberg.typedef import IcebergBaseModel +from pyiceberg.typedef import IcebergBaseModel, TableVersion from pyiceberg.types import IcebergType from pyiceberg.utils.deprecated import deprecation_message @@ -72,7 +72,7 @@ class SortField(IcebergBaseModel): def __init__( self, - format_version: int, + format_version: Optional[int] = None, source_id: Optional[int] = None, transform: Optional[Union[Transform[Any, Any], Callable[[IcebergType], Transform[Any, Any]]]] = None, direction: Optional[SortDirection] = None, @@ -87,8 +87,8 @@ def __init__( data["direction"] = direction if null_order is not None: data["null-order"] = null_order - - data["format-version"] = format_version + if format_version is not None: + data["format-version"] = format_version super().__init__(**data) @@ -99,10 +99,10 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]: values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST return values - @model_validator(mode="after") + @model_validator(mode="before") @classmethod def check_source_ids_against_version(cls, data: Any) -> Any: - if data["format-version"] in [1,2] and data["source-ids"]: + if "format-version" in data and data["format-version"] in [1,2] and "source-ids" in data: raise ValueError("source-ids is not allowed on Iceberg v1 and v2") @@ -186,7 +186,7 @@ def __repr__(self) -> str: def assign_fresh_sort_order_ids( - sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: int = INITIAL_SORT_ORDER_ID + format_version: TableVersion, sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: int = INITIAL_SORT_ORDER_ID ) -> SortOrder: if sort_order.is_unsorted: return UNSORTED_SORT_ORDER @@ -201,6 +201,7 @@ def assign_fresh_sort_order_ids( raise ValueError(f"Could not find field in fresh schema: {original_field}") fresh_fields.append( SortField( + format_version=format_version, source_id=fresh_field.field_id, transform=field.transform, direction=field.direction, diff --git a/pyiceberg/table/update/spec.py b/pyiceberg/table/update/spec.py index 1f91aa5d17..cebbf8007c 100644 --- a/pyiceberg/table/update/spec.py +++ b/pyiceberg/table/update/spec.py @@ -40,6 +40,7 @@ UpdateTableMetadata, ) from pyiceberg.transforms import IdentityTransform, TimeTransform, Transform, VoidTransform, parse_transform +from pyiceberg.typedef import TableVersion if TYPE_CHECKING: from pyiceberg.table import Transaction @@ -191,13 +192,14 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par partition_names.add(name) def _add_new_field( - schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str] + schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str], format_version: TableVersion ) -> PartitionField: _check_and_add_partition_name(schema, name, source_id, partition_names) - return PartitionField(source_id, field_id, transform, name) + return PartitionField(format_version, source_id, field_id, transform, name) partition_fields = [] partition_names: Set[str] = set() + format_version = self._transaction.table_metadata.format_version for field in self._transaction.table_metadata.spec().fields: if field.field_id not in self._deletes: renamed = self._renames.get(field.name) @@ -209,6 +211,7 @@ def _add_new_field( renamed, field.transform, partition_names, + format_version, ) else: new_field = _add_new_field( @@ -218,6 +221,7 @@ def _add_new_field( field.name, field.transform, partition_names, + format_version, ) partition_fields.append(new_field) elif self._transaction.table_metadata.format_version == 1: @@ -230,6 +234,7 @@ def _add_new_field( renamed, VoidTransform(), partition_names, + format_version, ) else: new_field = _add_new_field( @@ -239,6 +244,7 @@ def _add_new_field( field.name, VoidTransform(), partition_names, + format_version, ) partition_fields.append(new_field) @@ -249,6 +255,7 @@ def _add_new_field( field_id=added_field.field_id, transform=added_field.transform, name=added_field.name, + format_version=format_version, ) partition_fields.append(new_field) @@ -264,7 +271,8 @@ def _add_new_field( return PartitionSpec(*partition_fields, spec_id=new_spec_id) def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name: Optional[str]) -> PartitionField: - if self._transaction.table_metadata.format_version == 2: + format_version = self._transaction.table_metadata.format_version + if format_version == 2: source_id, transform = transform_key historical_fields = [] for spec in self._transaction.table_metadata.specs().values(): @@ -274,13 +282,13 @@ def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name: for field in historical_fields: if field.source_id == source_id and repr(field.transform) == repr(transform): if name is None or field.name == name: - return PartitionField(source_id, field.field_id, transform, field.name) + return PartitionField(format_version, source_id, field.field_id, transform, field.name) new_field_id = self._new_field_id() if name is None: - tmp_field = PartitionField(transform_key[0], new_field_id, transform_key[1], "unassigned_field_name") + tmp_field = PartitionField(format_version, transform_key[0], new_field_id, transform_key[1], "unassigned_field_name") name = _visit_partition_field(self._transaction.table_metadata.schema(), tmp_field, _PartitionNameGenerator()) - return PartitionField(transform_key[0], new_field_id, transform_key[1], name) + return PartitionField(format_version, transform_key[0], new_field_id, transform_key[1], name) def _new_field_id(self) -> int: self._last_assigned_partition_id += 1 diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index e5ccd0e9a1..fca28a37bd 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -212,20 +212,20 @@ def test_transform_consistency_with_pyarrow_transform(source_type: PrimitiveType def test_deserialize_partition_field_v2() -> None: - json_partition_spec = """{"source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = """{"format-version": 2, "source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" field = PartitionField.model_validate_json(json_partition_spec) assert field == PartitionField(format_version=2, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") def test_deserialize_partition_field_v3() -> None: - json_partition_spec = """{"source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = """{"format-version": 3, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" field = PartitionField.model_validate_json(json_partition_spec) assert field == PartitionField(format_version=3, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") def test_v3_does_not_allow_source_id() -> None: - json_partition_spec = """{"format-version": 3, "source-id": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = """{"format-version": 2, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" - with pytest.raises(ValueError, match=r"source-id is not allowed"): + with pytest.raises(ValueError, match=r"source-ids is not allowed"): PartitionField.model_validate_json(json_partition_spec) \ No newline at end of file diff --git a/tests/table/test_sorting.py b/tests/table/test_sorting.py index 3efda56509..d8d452014e 100644 --- a/tests/table/test_sorting.py +++ b/tests/table/test_sorting.py @@ -114,3 +114,8 @@ def test_serialize_sort_field_v3() -> None: expected = SortField(source_id=19, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST) payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' assert SortField.model_validate_json(payload) == expected + +def test_v2_does_not_support_source_ids() -> None: + payload = '{"format-version": 2, "source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' + with pytest.raises(ValueError, match=r"source-ids is not allowed"): + SortField.model_validate_json(payload) \ No newline at end of file From 81db77244d489a6ee300d50aedecb4f33929419b Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 17 Jun 2025 16:10:47 -0700 Subject: [PATCH 5/5] plumbing should be done according to linter --- pyiceberg/catalog/rest/__init__.py | 5 +- pyiceberg/partitioning.py | 3 +- pyiceberg/table/metadata.py | 3 +- pyiceberg/table/sorting.py | 10 +- pyiceberg/table/update/spec.py | 8 +- tests/expressions/test_projection.py | 14 +-- tests/expressions/test_residual_evaluator.py | 18 ++-- tests/integration/test_partition_evolution.py | 92 ++++++++++--------- tests/io/test_pyarrow.py | 6 +- tests/table/test_partitioning.py | 39 +++++--- tests/table/test_sorting.py | 6 +- 11 files changed, 119 insertions(+), 85 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 718f2442ff..72cd7413c0 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -64,6 +64,7 @@ StagedTable, Table, TableIdentifier, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids @@ -71,7 +72,7 @@ TableRequirement, TableUpdate, ) -from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties +from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties, TableVersion from pyiceberg.types import transform_dict_value_to_str from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool @@ -480,7 +481,7 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: - format_version = self.properties.get(ICEBERG_REST_SPEC_VERSION) + format_version: TableVersion = self.properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION) iceberg_schema = self._convert_schema_if_needed(schema) fresh_schema = assign_fresh_schema_ids(iceberg_schema) fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 94ea8613ac..e04501d732 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -59,7 +59,6 @@ UUIDType, ) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros -from pyiceberg.utils.deprecated import deprecation_message INITIAL_PARTITION_SPEC_ID = 0 PARTITION_FIELD_ID_START: int = 1000 @@ -123,7 +122,7 @@ def map_source_ids_onto_source_id(cls, data: Any) -> Any: @model_validator(mode="before") @classmethod def check_source_ids_against_version(cls, data: Any) -> Any: - if "format-version" in data and data["format-version"] in [1,2] and "source-ids" in data: + if "format-version" in data and data["format-version"] in [1, 2] and "source-ids" in data: raise ValueError("source-ids is not allowed on Iceberg v1 and v2") return data diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index f36dc3f9ca..875623becd 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -42,6 +42,7 @@ IcebergBaseModel, IcebergRootModel, Properties, + TableVersion, ) from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str from pyiceberg.utils.config import Config @@ -572,7 +573,7 @@ def new_table_metadata( from pyiceberg.table import TableProperties # Remove format-version so it does not get persisted - format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)) + format_version: TableVersion = properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION) schema.check_format_version_compatibility(format_version) diff --git a/pyiceberg/table/sorting.py b/pyiceberg/table/sorting.py index e75d203678..0a9ae767b4 100644 --- a/pyiceberg/table/sorting.py +++ b/pyiceberg/table/sorting.py @@ -30,7 +30,6 @@ from pyiceberg.transforms import IdentityTransform, Transform, parse_transform from pyiceberg.typedef import IcebergBaseModel, TableVersion from pyiceberg.types import IcebergType -from pyiceberg.utils.deprecated import deprecation_message class SortDirection(Enum): @@ -102,10 +101,9 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]: @model_validator(mode="before") @classmethod def check_source_ids_against_version(cls, data: Any) -> Any: - if "format-version" in data and data["format-version"] in [1,2] and "source-ids" in data: + if "format-version" in data and data["format-version"] in [1, 2] and "source-ids" in data: raise ValueError("source-ids is not allowed on Iceberg v1 and v2") - @model_validator(mode="before") @classmethod def map_source_ids_onto_source_id(cls, data: Any) -> Any: @@ -186,7 +184,11 @@ def __repr__(self) -> str: def assign_fresh_sort_order_ids( - format_version: TableVersion, sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: int = INITIAL_SORT_ORDER_ID + format_version: TableVersion, + sort_order: SortOrder, + old_schema: Schema, + fresh_schema: Schema, + sort_order_id: int = INITIAL_SORT_ORDER_ID, ) -> SortOrder: if sort_order.is_unsorted: return UNSORTED_SORT_ORDER diff --git a/pyiceberg/table/update/spec.py b/pyiceberg/table/update/spec.py index cebbf8007c..1a65fcb9d4 100644 --- a/pyiceberg/table/update/spec.py +++ b/pyiceberg/table/update/spec.py @@ -192,7 +192,13 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par partition_names.add(name) def _add_new_field( - schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str], format_version: TableVersion + schema: Schema, + source_id: int, + field_id: int, + name: str, + transform: Transform[Any, Any], + partition_names: Set[str], + format_version: TableVersion, ) -> PartitionField: _check_and_add_partition_name(schema, name, source_id, partition_names) return PartitionField(format_version, source_id, field_id, transform, name) diff --git a/tests/expressions/test_projection.py b/tests/expressions/test_projection.py index 4d0c2c1346..a8ffa5a015 100644 --- a/tests/expressions/test_projection.py +++ b/tests/expressions/test_projection.py @@ -70,38 +70,38 @@ def empty_spec() -> PartitionSpec: @pytest.fixture def id_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "id_part")) + return PartitionSpec(PartitionField(2, 1, 1000, IdentityTransform(), "id_part")) @pytest.fixture def bucket_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(2, 1000, BucketTransform(16), "data_bucket")) + return PartitionSpec(PartitionField(2, 2, 1000, BucketTransform(16), "data_bucket")) @pytest.fixture def day_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(4, 1000, DayTransform(), "date"), PartitionField(3, 1000, DayTransform(), "ddate")) + return PartitionSpec(PartitionField(2, 4, 1000, DayTransform(), "date"), PartitionField(2, 3, 1000, DayTransform(), "ddate")) @pytest.fixture def hour_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(4, 1000, HourTransform(), "hour")) + return PartitionSpec(PartitionField(2, 4, 1000, HourTransform(), "hour")) @pytest.fixture def truncate_str_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(2, 1000, TruncateTransform(2), "data_trunc")) + return PartitionSpec(PartitionField(2, 2, 1000, TruncateTransform(2), "data_trunc")) @pytest.fixture def truncate_int_spec() -> PartitionSpec: - return PartitionSpec(PartitionField(1, 1000, TruncateTransform(10), "id_trunc")) + return PartitionSpec(PartitionField(2, 1, 1000, TruncateTransform(10), "id_trunc")) @pytest.fixture def id_and_bucket_spec() -> PartitionSpec: return PartitionSpec( - PartitionField(1, 1000, IdentityTransform(), "id_part"), PartitionField(2, 1001, BucketTransform(16), "data_bucket") + PartitionField(2, 1, 1000, IdentityTransform(), "id_part"), PartitionField(2, 2, 1001, BucketTransform(16), "data_bucket") ) diff --git a/tests/expressions/test_residual_evaluator.py b/tests/expressions/test_residual_evaluator.py index ba0a0da2e5..d0af717c15 100644 --- a/tests/expressions/test_residual_evaluator.py +++ b/tests/expressions/test_residual_evaluator.py @@ -47,7 +47,7 @@ def test_identity_transform_residual() -> None: schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part")) predicate = Or( Or( @@ -91,7 +91,7 @@ def test_identity_transform_residual() -> None: def test_case_insensitive_identity_transform_residuals() -> None: schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part")) predicate = Or( Or( @@ -136,7 +136,7 @@ def test_unpartitioned_residuals() -> None: def test_in() -> None: schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part")) predicate = In("dateint", [20170815, 20170816, 20170817]) @@ -150,7 +150,7 @@ def test_in() -> None: def test_in_timestamp() -> None: schema = Schema(NestedField(50, "ts", TimestampType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1000, DayTransform(), "ts_part")) + spec = PartitionSpec(PartitionField(2, 50, 1000, DayTransform(), "ts_part")) date_20191201 = literal("2019-12-01T00:00:00").to(TimestampType()).value date_20191202 = literal("2019-12-02T00:00:00").to(TimestampType()).value @@ -172,7 +172,7 @@ def test_in_timestamp() -> None: def test_not_in() -> None: schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part")) predicate = NotIn("dateint", [20170815, 20170816, 20170817]) @@ -188,7 +188,7 @@ def test_not_in() -> None: def test_is_nan() -> None: schema = Schema(NestedField(50, "double", DoubleType()), NestedField(51, "hour", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "double_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "double_part")) predicate = IsNaN("double") @@ -204,7 +204,7 @@ def test_is_nan() -> None: def test_is_not_nan() -> None: schema = Schema(NestedField(50, "double", DoubleType()), NestedField(51, "float", FloatType())) - spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "double_part")) + spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "double_part")) predicate = NotNaN("double") @@ -216,7 +216,7 @@ def test_is_not_nan() -> None: residual = res_eval.residual_for(Record(2)) assert residual == AlwaysTrue() - spec = PartitionSpec(PartitionField(51, 1051, IdentityTransform(), "float_part")) + spec = PartitionSpec(PartitionField(2, 51, 1051, IdentityTransform(), "float_part")) predicate = NotNaN("float") @@ -232,7 +232,7 @@ def test_is_not_nan() -> None: def test_not_in_timestamp() -> None: schema = Schema(NestedField(50, "ts", TimestampType()), NestedField(51, "dateint", IntegerType())) - spec = PartitionSpec(PartitionField(50, 1000, DayTransform(), "ts_part")) + spec = PartitionSpec(PartitionField(2, 50, 1000, DayTransform(), "ts_part")) date_20191201 = literal("2019-12-01T00:00:00").to(TimestampType()).value date_20191202 = literal("2019-12-02T00:00:00").to(TimestampType()).value diff --git a/tests/integration/test_partition_evolution.py b/tests/integration/test_partition_evolution.py index d489d6a5d0..9d68c5efb3 100644 --- a/tests/integration/test_partition_evolution.py +++ b/tests/integration/test_partition_evolution.py @@ -89,7 +89,7 @@ def test_add_identity_partition(catalog: Catalog, table_schema_simple: Schema) - def test_add_year(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", YearTransform(), "year_transform").commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "year_transform")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, YearTransform(), "year_transform")) @pytest.mark.integration @@ -97,7 +97,7 @@ def test_add_year(catalog: Catalog) -> None: def test_add_year_generates_default_name(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", YearTransform()).commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "event_ts_year")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, YearTransform(), "event_ts_year")) @pytest.mark.integration @@ -105,7 +105,7 @@ def test_add_year_generates_default_name(catalog: Catalog) -> None: def test_add_month(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", MonthTransform(), "month_transform").commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "month_transform")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, MonthTransform(), "month_transform")) @pytest.mark.integration @@ -113,7 +113,7 @@ def test_add_month(catalog: Catalog) -> None: def test_add_month_generates_default_name(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", MonthTransform()).commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "event_ts_month")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, MonthTransform(), "event_ts_month")) @pytest.mark.integration @@ -121,7 +121,7 @@ def test_add_month_generates_default_name(catalog: Catalog) -> None: def test_add_day(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", DayTransform(), "day_transform").commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "day_transform")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, DayTransform(), "day_transform")) @pytest.mark.integration @@ -129,7 +129,7 @@ def test_add_day(catalog: Catalog) -> None: def test_add_day_generates_default_name(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", DayTransform()).commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "event_ts_day")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, DayTransform(), "event_ts_day")) @pytest.mark.integration @@ -137,7 +137,7 @@ def test_add_day_generates_default_name(catalog: Catalog) -> None: def test_add_hour(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", HourTransform(), "hour_transform").commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "hour_transform")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, HourTransform(), "hour_transform")) @pytest.mark.integration @@ -145,7 +145,7 @@ def test_add_hour(catalog: Catalog) -> None: def test_add_hour_string_transform(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", "hour", "str_hour_transform").commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "str_hour_transform")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, HourTransform(), "str_hour_transform")) @pytest.mark.integration @@ -153,7 +153,7 @@ def test_add_hour_string_transform(catalog: Catalog) -> None: def test_add_hour_generates_default_name(catalog: Catalog) -> None: table = _table(catalog) table.update_spec().add_field("event_ts", HourTransform()).commit() - _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "event_ts_hour")) + _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 2, 1000, HourTransform(), "event_ts_hour")) @pytest.mark.integration @@ -161,7 +161,9 @@ def test_add_hour_generates_default_name(catalog: Catalog) -> None: def test_add_bucket(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", BucketTransform(12), "bucket_transform").commit() - _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "bucket_transform")) + _validate_new_partition_fields( + simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, BucketTransform(12), "bucket_transform") + ) @pytest.mark.integration @@ -169,7 +171,7 @@ def test_add_bucket(catalog: Catalog, table_schema_simple: Schema) -> None: def test_add_bucket_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", BucketTransform(12)).commit() - _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "foo_bucket_12")) + _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, BucketTransform(12), "foo_bucket_12")) @pytest.mark.integration @@ -178,7 +180,7 @@ def test_add_truncate(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", TruncateTransform(1), "truncate_transform").commit() _validate_new_partition_fields( - simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "truncate_transform") + simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, TruncateTransform(1), "truncate_transform") ) @@ -187,7 +189,7 @@ def test_add_truncate(catalog: Catalog, table_schema_simple: Schema) -> None: def test_add_truncate_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", TruncateTransform(1)).commit() - _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "foo_trunc_1")) + _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, TruncateTransform(1), "foo_trunc_1")) @pytest.mark.integration @@ -202,9 +204,9 @@ def test_multiple_adds(catalog: Catalog) -> None: 1002, 1, 1002, - PartitionField(1, 1000, IdentityTransform(), "id"), - PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), - PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), + PartitionField(2, 1, 1000, IdentityTransform(), "id"), + PartitionField(2, 2, 1001, HourTransform(), "hourly_partitioned"), + PartitionField(2, 3, 1002, TruncateTransform(2), "truncate_str"), ) @@ -213,7 +215,7 @@ def test_multiple_adds(catalog: Catalog) -> None: def test_add_void(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", VoidTransform(), "void_transform").commit() - _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "void_transform")) + _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, VoidTransform(), "void_transform")) @pytest.mark.integration @@ -221,7 +223,7 @@ def test_add_void(catalog: Catalog, table_schema_simple: Schema) -> None: def test_add_void_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") simple_table.update_spec().add_field("foo", VoidTransform()).commit() - _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "foo_null")) + _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(2, 1, 1000, VoidTransform(), "foo_null")) @pytest.mark.integration @@ -235,8 +237,8 @@ def test_add_hour_to_day(catalog: Catalog) -> None: 1001, 2, 1001, - PartitionField(2, 1000, DayTransform(), "daily_partitioned"), - PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), + PartitionField(2, 2, 1000, DayTransform(), "daily_partitioned"), + PartitionField(2, 2, 1001, HourTransform(), "hourly_partitioned"), ) @@ -250,8 +252,8 @@ def test_add_multiple_buckets(catalog: Catalog) -> None: 1001, 1, 1001, - PartitionField(1, 1000, BucketTransform(16), "id_bucket_16"), - PartitionField(1, 1001, BucketTransform(4), "id_bucket_4"), + PartitionField(2, 1, 1000, BucketTransform(16), "id_bucket_16"), + PartitionField(2, 1, 1001, BucketTransform(4), "id_bucket_4"), ) @@ -264,7 +266,7 @@ def test_remove_identity(catalog: Catalog) -> None: assert len(table.specs()) == 3 assert table.spec().spec_id == 2 assert table.spec() == PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id"), spec_id=2 + PartitionField(format_version=2, source_id=1, field_id=1000, transform=VoidTransform(), name="id"), spec_id=2 ) @@ -290,8 +292,8 @@ def test_remove_and_add_identity(catalog: Catalog) -> None: assert len(table.specs()) == 4 assert table.spec().spec_id == 3 assert table.spec() == PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id_1000"), - PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="id"), + PartitionField(format_version=2, source_id=1, field_id=1000, transform=VoidTransform(), name="id_1000"), + PartitionField(format_version=2, source_id=1, field_id=1001, transform=IdentityTransform(), name="id"), spec_id=3, ) @@ -307,7 +309,7 @@ def test_remove_and_add_identity_v2(catalog: Catalog) -> None: assert len(table_v2.specs()) == 2 assert table_v2.spec().spec_id == 1 assert table_v2.spec() == PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1 + PartitionField(format_version=2, source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1 ) @@ -327,8 +329,8 @@ def test_remove_bucket(catalog: Catalog) -> None: 1001, 2, 1001, - PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="bucketed_id"), - PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts"), + PartitionField(format_version=2, source_id=1, field_id=1000, transform=VoidTransform(), name="bucketed_id"), + PartitionField(format_version=2, source_id=2, field_id=1001, transform=DayTransform(), name="day_ts"), ) @@ -343,7 +345,11 @@ def test_remove_bucket_v2(catalog: Catalog) -> None: remove.remove_field("bucketed_id") assert len(table_v2.specs()) == 3 _validate_new_partition_fields( - table_v2, 1001, 2, 1001, PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts") + table_v2, + 1001, + 2, + 1001, + PartitionField(format_version=2, source_id=2, field_id=1001, transform=DayTransform(), name="day_ts"), ) @@ -363,8 +369,8 @@ def test_remove_day(catalog: Catalog) -> None: 1001, 2, 1001, - PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id"), - PartitionField(source_id=2, field_id=1001, transform=VoidTransform(), name="day_ts"), + PartitionField(format_version=2, source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id"), + PartitionField(format_version=2, source_id=2, field_id=1001, transform=VoidTransform(), name="day_ts"), ) @@ -379,7 +385,11 @@ def test_remove_day_v2(catalog: Catalog) -> None: remove.remove_field("day_ts") assert len(table_v2.specs()) == 3 _validate_new_partition_fields( - table_v2, 1000, 2, 1001, PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id") + table_v2, + 1000, + 2, + 1001, + PartitionField(format_version=2, source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id"), ) @@ -391,7 +401,7 @@ def test_rename(catalog: Catalog) -> None: table.update_spec().rename_field("id", "sharded_id").commit() assert len(table.specs()) == 3 assert table.spec().spec_id == 2 - _validate_new_partition_fields(table, 1000, 2, 1000, PartitionField(1, 1000, IdentityTransform(), "sharded_id")) + _validate_new_partition_fields(table, 1000, 2, 1000, PartitionField(2, 1, 1000, IdentityTransform(), "sharded_id")) @pytest.mark.integration @@ -485,9 +495,9 @@ def test_change_specs_and_schema_transaction(catalog: Catalog) -> None: 1002, 1, 1002, - PartitionField(1, 1000, IdentityTransform(), "id"), - PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), - PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), + PartitionField(2, 1, 1000, IdentityTransform(), "id"), + PartitionField(2, 2, 1001, HourTransform(), "hourly_partitioned"), + PartitionField(2, 3, 1002, TruncateTransform(2), "truncate_str"), ) assert table.schema() == Schema( @@ -516,9 +526,9 @@ def test_multiple_adds_and_remove_v1(catalog: Catalog) -> None: 1002, 3, 1002, - PartitionField(1, 1000, VoidTransform(), "bucketed_id"), - PartitionField(2, 1001, VoidTransform(), "day_ts"), - PartitionField(3, 1002, TruncateTransform(2), "truncated_str"), + PartitionField(2, 1, 1000, VoidTransform(), "bucketed_id"), + PartitionField(2, 2, 1001, VoidTransform(), "day_ts"), + PartitionField(2, 3, 1002, TruncateTransform(2), "truncated_str"), ) @@ -533,7 +543,7 @@ def test_multiple_adds_and_remove_v2(catalog: Catalog) -> None: update.remove_field("day_ts").remove_field("bucketed_id") with table_v2.update_spec() as update: update.add_field("str", TruncateTransform(2), "truncated_str") - _validate_new_partition_fields(table_v2, 1002, 2, 1002, PartitionField(3, 1002, TruncateTransform(2), "truncated_str")) + _validate_new_partition_fields(table_v2, 1002, 2, 1002, PartitionField(2, 3, 1002, TruncateTransform(2), "truncated_str")) @pytest.mark.integration @@ -547,7 +557,7 @@ def test_multiple_remove_and_add_reuses_v2(catalog: Catalog) -> None: update.remove_field("day_ts").remove_field("bucketed_id") with table_v2.update_spec() as update: update.add_field("id", BucketTransform(16), "bucketed_id") - _validate_new_partition_fields(table_v2, 1000, 2, 1001, PartitionField(1, 1000, BucketTransform(16), "bucketed_id")) + _validate_new_partition_fields(table_v2, 1000, 2, 1001, PartitionField(2, 1, 1000, BucketTransform(16), "bucketed_id")) def _validate_new_partition_fields( diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e90f3a46fc..ad47fac8f1 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1142,7 +1142,7 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa ) partition_spec = PartitionSpec( - PartitionField(2, 1000, IdentityTransform(), "partition_id"), + PartitionField(2, 2, 1000, IdentityTransform(), "partition_id"), ) catalog.create_namespace("default") @@ -1202,8 +1202,8 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC ) partition_spec = PartitionSpec( - PartitionField(2, 1000, IdentityTransform(), "field_2"), - PartitionField(3, 1001, IdentityTransform(), "field_3"), + PartitionField(2, 2, 1000, IdentityTransform(), "field_2"), + PartitionField(2, 3, 1001, IdentityTransform(), "field_3"), ) catalog.create_namespace("default") diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index fca28a37bd..e6bacf6eb3 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -53,7 +53,7 @@ def test_partition_field_init() -> None: bucket_transform = BucketTransform(100) # type: ignore - partition_field = PartitionField(3, 1000, bucket_transform, "id") + partition_field = PartitionField(2, 3, 1000, bucket_transform, "id") assert partition_field.source_id == 3 assert partition_field.field_id == 1000 @@ -74,7 +74,7 @@ def test_unpartitioned_partition_spec_repr() -> None: def test_partition_spec_init() -> None: bucket_transform: BucketTransform = BucketTransform(4) # type: ignore - id_field1 = PartitionField(3, 1001, bucket_transform, "id") + id_field1 = PartitionField(2, 3, 1001, bucket_transform, "id") partition_spec1 = PartitionSpec(id_field1) assert partition_spec1.spec_id == 0 @@ -83,7 +83,7 @@ def test_partition_spec_init() -> None: assert str(partition_spec1) == f"[\n {str(id_field1)}\n]" assert not partition_spec1.is_unpartitioned() # only differ by PartitionField field_id - id_field2 = PartitionField(3, 1002, bucket_transform, "id") + id_field2 = PartitionField(2, 3, 1002, bucket_transform, "id") partition_spec2 = PartitionSpec(id_field2) assert partition_spec1 != partition_spec2 assert partition_spec1.compatible_with(partition_spec2) @@ -94,8 +94,8 @@ def test_partition_spec_init() -> None: def test_partition_compatible_with() -> None: bucket_transform: BucketTransform = BucketTransform(4) # type: ignore - field1 = PartitionField(3, 100, bucket_transform, "id") - field2 = PartitionField(3, 102, bucket_transform, "id") + field1 = PartitionField(2, 3, 100, bucket_transform, "id") + field2 = PartitionField(2, 3, 102, bucket_transform, "id") lhs = PartitionSpec( field1, ) @@ -115,8 +115,10 @@ def test_serialize_unpartitioned_spec() -> None: def test_serialize_partition_spec() -> None: partitioned = PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"), - PartitionField(source_id=2, field_id=1001, transform=BucketTransform(num_buckets=25), name="int_bucket"), + PartitionField(format_version=2, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"), + PartitionField( + format_version=2, source_id=2, field_id=1001, transform=BucketTransform(num_buckets=25), name="int_bucket" + ), spec_id=3, ) assert ( @@ -212,20 +214,31 @@ def test_transform_consistency_with_pyarrow_transform(source_type: PrimitiveType def test_deserialize_partition_field_v2() -> None: - json_partition_spec = """{"format-version": 2, "source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = ( + """{"format-version": 2, "source-id": 1, "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + ) field = PartitionField.model_validate_json(json_partition_spec) - assert field == PartitionField(format_version=2, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + assert field == PartitionField( + format_version=2, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate" + ) def test_deserialize_partition_field_v3() -> None: - json_partition_spec = """{"format-version": 3, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = ( + """{"format-version": 3, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + ) field = PartitionField.model_validate_json(json_partition_spec) - assert field == PartitionField(format_version=3, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + assert field == PartitionField( + format_version=3, source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate" + ) + def test_v3_does_not_allow_source_id() -> None: - json_partition_spec = """{"format-version": 2, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + json_partition_spec = ( + """{"format-version": 2, "source-ids": [1], "field-id": 1000, "transform": "truncate[19]", "name": "str_truncate"}""" + ) with pytest.raises(ValueError, match=r"source-ids is not allowed"): - PartitionField.model_validate_json(json_partition_spec) \ No newline at end of file + PartitionField.model_validate_json(json_partition_spec) diff --git a/tests/table/test_sorting.py b/tests/table/test_sorting.py index d8d452014e..b60dd4aae5 100644 --- a/tests/table/test_sorting.py +++ b/tests/table/test_sorting.py @@ -68,8 +68,9 @@ def test_sorting_schema(example_table_metadata_v2: Dict[str, Any]) -> None: assert table_metadata.sort_orders == [ SortOrder( - SortField(2, IdentityTransform(), SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), + SortField(2, 2, IdentityTransform(), SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), SortField( + 2, 3, BucketTransform(4), direction=SortDirection.DESC, @@ -115,7 +116,8 @@ def test_serialize_sort_field_v3() -> None: payload = '{"source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' assert SortField.model_validate_json(payload) == expected + def test_v2_does_not_support_source_ids() -> None: payload = '{"format-version": 2, "source-ids":[19],"transform":"identity","direction":"asc","null-order":"nulls-first"}' with pytest.raises(ValueError, match=r"source-ids is not allowed"): - SortField.model_validate_json(payload) \ No newline at end of file + SortField.model_validate_json(payload)