Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@
StagedTable,
Table,
TableIdentifier,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.table.update import (
TableRequirement,
TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties, TableVersion
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
Expand Down Expand Up @@ -480,10 +481,11 @@ def _create_table(
properties: Properties = EMPTY_DICT,
stage_create: bool = False,
) -> TableResponse:
format_version: TableVersion = self.properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)
iceberg_schema = self._convert_schema_if_needed(schema)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(format_version, sort_order, iceberg_schema, fresh_schema)

namespace_and_table = self._split_identifier_for_path(identifier)
if location:
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class PartitionField(IcebergBaseModel):

def __init__(
self,
format_version: Optional[int] = None,
Comment thread
rambleraptor marked this conversation as resolved.
source_id: Optional[int] = None,
field_id: Optional[int] = None,
transform: Optional[Transform[Any, Any]] = None,
Expand All @@ -100,6 +101,8 @@ def __init__(
data["transform"] = transform
if name is not None:
data["name"] = name
if format_version is not None:
data["format-version"] = format_version

super().__init__(**data)

Expand All @@ -116,6 +119,14 @@ def map_source_ids_onto_source_id(cls, data: Any) -> Any:
data["source-id"] = source_ids[0]
return data

@model_validator(mode="before")
@classmethod
def check_source_ids_against_version(cls, data: Any) -> Any:
if "format-version" in data and data["format-version"] in [1, 2] and "source-ids" in data:
raise ValueError("source-ids is not allowed on Iceberg v1 and v2")

return data

def __str__(self) -> str:
"""Return the string representation of the PartitionField class."""
return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
Expand Down
5 changes: 3 additions & 2 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
IcebergBaseModel,
IcebergRootModel,
Properties,
TableVersion,
)
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
from pyiceberg.utils.config import Config
Expand Down Expand Up @@ -572,13 +573,13 @@ def new_table_metadata(
from pyiceberg.table import TableProperties

# Remove format-version so it does not get persisted
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
format_version: TableVersion = properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)

schema.check_format_version_compatibility(format_version)

fresh_schema = assign_fresh_schema_ids(schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(format_version, sort_order, schema, fresh_schema)

if table_uuid is None:
table_uuid = uuid.uuid4()
Expand Down
19 changes: 17 additions & 2 deletions pyiceberg/table/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform, Transform, parse_transform
from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.typedef import IcebergBaseModel, TableVersion
from pyiceberg.types import IcebergType


Expand Down Expand Up @@ -71,6 +71,7 @@ class SortField(IcebergBaseModel):

def __init__(
self,
format_version: Optional[int] = None,
source_id: Optional[int] = None,
transform: Optional[Union[Transform[Any, Any], Callable[[IcebergType], Transform[Any, Any]]]] = None,
direction: Optional[SortDirection] = None,
Expand All @@ -85,6 +86,9 @@ def __init__(
data["direction"] = direction
if null_order is not None:
data["null-order"] = null_order
if format_version is not None:
data["format-version"] = format_version

super().__init__(**data)

@model_validator(mode="before")
Expand All @@ -94,6 +98,12 @@ def set_null_order(cls, values: Dict[str, Any]) -> Dict[str, Any]:
values["null-order"] = NullOrder.NULLS_FIRST if values["direction"] == SortDirection.ASC else NullOrder.NULLS_LAST
return values

@model_validator(mode="before")
@classmethod
def check_source_ids_against_version(cls, data: Any) -> Any:
if "format-version" in data and data["format-version"] in [1, 2] and "source-ids" in data:
raise ValueError("source-ids is not allowed on Iceberg v1 and v2")

@model_validator(mode="before")
@classmethod
def map_source_ids_onto_source_id(cls, data: Any) -> Any:
Expand Down Expand Up @@ -174,7 +184,11 @@ def __repr__(self) -> str:


def assign_fresh_sort_order_ids(
sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema, sort_order_id: int = INITIAL_SORT_ORDER_ID
format_version: TableVersion,
sort_order: SortOrder,
old_schema: Schema,
fresh_schema: Schema,
sort_order_id: int = INITIAL_SORT_ORDER_ID,
) -> SortOrder:
if sort_order.is_unsorted:
return UNSORTED_SORT_ORDER
Expand All @@ -189,6 +203,7 @@ def assign_fresh_sort_order_ids(
raise ValueError(f"Could not find field in fresh schema: {original_field}")
fresh_fields.append(
SortField(
format_version=format_version,
source_id=fresh_field.field_id,
transform=field.transform,
direction=field.direction,
Expand Down
26 changes: 20 additions & 6 deletions pyiceberg/table/update/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
UpdateTableMetadata,
)
from pyiceberg.transforms import IdentityTransform, TimeTransform, Transform, VoidTransform, parse_transform
from pyiceberg.typedef import TableVersion

if TYPE_CHECKING:
from pyiceberg.table import Transaction
Expand Down Expand Up @@ -191,13 +192,20 @@ def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, par
partition_names.add(name)

def _add_new_field(
schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str]
schema: Schema,
source_id: int,
field_id: int,
name: str,
transform: Transform[Any, Any],
partition_names: Set[str],
format_version: TableVersion,
) -> PartitionField:
_check_and_add_partition_name(schema, name, source_id, partition_names)
return PartitionField(source_id, field_id, transform, name)
return PartitionField(format_version, source_id, field_id, transform, name)

partition_fields = []
partition_names: Set[str] = set()
format_version = self._transaction.table_metadata.format_version
for field in self._transaction.table_metadata.spec().fields:
if field.field_id not in self._deletes:
renamed = self._renames.get(field.name)
Expand All @@ -209,6 +217,7 @@ def _add_new_field(
renamed,
field.transform,
partition_names,
format_version,
)
else:
new_field = _add_new_field(
Expand All @@ -218,6 +227,7 @@ def _add_new_field(
field.name,
field.transform,
partition_names,
format_version,
)
partition_fields.append(new_field)
elif self._transaction.table_metadata.format_version == 1:
Expand All @@ -230,6 +240,7 @@ def _add_new_field(
renamed,
VoidTransform(),
partition_names,
format_version,
)
else:
new_field = _add_new_field(
Expand All @@ -239,6 +250,7 @@ def _add_new_field(
field.name,
VoidTransform(),
partition_names,
format_version,
)

partition_fields.append(new_field)
Expand All @@ -249,6 +261,7 @@ def _add_new_field(
field_id=added_field.field_id,
transform=added_field.transform,
name=added_field.name,
format_version=format_version,
)
partition_fields.append(new_field)

Expand All @@ -264,7 +277,8 @@ def _add_new_field(
return PartitionSpec(*partition_fields, spec_id=new_spec_id)

def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name: Optional[str]) -> PartitionField:
if self._transaction.table_metadata.format_version == 2:
format_version = self._transaction.table_metadata.format_version
if format_version == 2:
source_id, transform = transform_key
historical_fields = []
for spec in self._transaction.table_metadata.specs().values():
Expand All @@ -274,13 +288,13 @@ def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name:
for field in historical_fields:
if field.source_id == source_id and repr(field.transform) == repr(transform):
if name is None or field.name == name:
return PartitionField(source_id, field.field_id, transform, field.name)
return PartitionField(format_version, source_id, field.field_id, transform, field.name)

new_field_id = self._new_field_id()
if name is None:
tmp_field = PartitionField(transform_key[0], new_field_id, transform_key[1], "unassigned_field_name")
tmp_field = PartitionField(format_version, transform_key[0], new_field_id, transform_key[1], "unassigned_field_name")
name = _visit_partition_field(self._transaction.table_metadata.schema(), tmp_field, _PartitionNameGenerator())
return PartitionField(transform_key[0], new_field_id, transform_key[1], name)
return PartitionField(format_version, transform_key[0], new_field_id, transform_key[1], name)

def _new_field_id(self) -> int:
self._last_assigned_partition_id += 1
Expand Down
14 changes: 7 additions & 7 deletions tests/expressions/test_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,38 +70,38 @@ def empty_spec() -> PartitionSpec:

@pytest.fixture
def id_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "id_part"))
return PartitionSpec(PartitionField(2, 1, 1000, IdentityTransform(), "id_part"))


@pytest.fixture
def bucket_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(2, 1000, BucketTransform(16), "data_bucket"))
return PartitionSpec(PartitionField(2, 2, 1000, BucketTransform(16), "data_bucket"))


@pytest.fixture
def day_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(4, 1000, DayTransform(), "date"), PartitionField(3, 1000, DayTransform(), "ddate"))
return PartitionSpec(PartitionField(2, 4, 1000, DayTransform(), "date"), PartitionField(2, 3, 1000, DayTransform(), "ddate"))


@pytest.fixture
def hour_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(4, 1000, HourTransform(), "hour"))
return PartitionSpec(PartitionField(2, 4, 1000, HourTransform(), "hour"))


@pytest.fixture
def truncate_str_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(2, 1000, TruncateTransform(2), "data_trunc"))
return PartitionSpec(PartitionField(2, 2, 1000, TruncateTransform(2), "data_trunc"))


@pytest.fixture
def truncate_int_spec() -> PartitionSpec:
return PartitionSpec(PartitionField(1, 1000, TruncateTransform(10), "id_trunc"))
return PartitionSpec(PartitionField(2, 1, 1000, TruncateTransform(10), "id_trunc"))


@pytest.fixture
def id_and_bucket_spec() -> PartitionSpec:
return PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "id_part"), PartitionField(2, 1001, BucketTransform(16), "data_bucket")
PartitionField(2, 1, 1000, IdentityTransform(), "id_part"), PartitionField(2, 2, 1001, BucketTransform(16), "data_bucket")
)


Expand Down
18 changes: 9 additions & 9 deletions tests/expressions/test_residual_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
def test_identity_transform_residual() -> None:
schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part"))

predicate = Or(
Or(
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_identity_transform_residual() -> None:
def test_case_insensitive_identity_transform_residuals() -> None:
schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part"))

predicate = Or(
Or(
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_unpartitioned_residuals() -> None:
def test_in() -> None:
schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part"))

predicate = In("dateint", [20170815, 20170816, 20170817])

Expand All @@ -150,7 +150,7 @@ def test_in() -> None:
def test_in_timestamp() -> None:
schema = Schema(NestedField(50, "ts", TimestampType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1000, DayTransform(), "ts_part"))
spec = PartitionSpec(PartitionField(2, 50, 1000, DayTransform(), "ts_part"))

date_20191201 = literal("2019-12-01T00:00:00").to(TimestampType()).value
date_20191202 = literal("2019-12-02T00:00:00").to(TimestampType()).value
Expand All @@ -172,7 +172,7 @@ def test_in_timestamp() -> None:
def test_not_in() -> None:
schema = Schema(NestedField(50, "dateint", IntegerType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "dateint_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "dateint_part"))

predicate = NotIn("dateint", [20170815, 20170816, 20170817])

Expand All @@ -188,7 +188,7 @@ def test_not_in() -> None:
def test_is_nan() -> None:
schema = Schema(NestedField(50, "double", DoubleType()), NestedField(51, "hour", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "double_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "double_part"))

predicate = IsNaN("double")

Expand All @@ -204,7 +204,7 @@ def test_is_nan() -> None:
def test_is_not_nan() -> None:
schema = Schema(NestedField(50, "double", DoubleType()), NestedField(51, "float", FloatType()))

spec = PartitionSpec(PartitionField(50, 1050, IdentityTransform(), "double_part"))
spec = PartitionSpec(PartitionField(2, 50, 1050, IdentityTransform(), "double_part"))

predicate = NotNaN("double")

Expand All @@ -216,7 +216,7 @@ def test_is_not_nan() -> None:
residual = res_eval.residual_for(Record(2))
assert residual == AlwaysTrue()

spec = PartitionSpec(PartitionField(51, 1051, IdentityTransform(), "float_part"))
spec = PartitionSpec(PartitionField(2, 51, 1051, IdentityTransform(), "float_part"))

predicate = NotNaN("float")

Expand All @@ -232,7 +232,7 @@ def test_is_not_nan() -> None:
def test_not_in_timestamp() -> None:
schema = Schema(NestedField(50, "ts", TimestampType()), NestedField(51, "dateint", IntegerType()))

spec = PartitionSpec(PartitionField(50, 1000, DayTransform(), "ts_part"))
spec = PartitionSpec(PartitionField(2, 50, 1000, DayTransform(), "ts_part"))

date_20191201 = literal("2019-12-01T00:00:00").to(TimestampType()).value
date_20191202 = literal("2019-12-02T00:00:00").to(TimestampType()).value
Expand Down
Loading