Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: "
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e
except self.glue.exceptions.ConcurrentModificationException as e:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update "
f"to table version {version_id}"
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef":
Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/catalog/rest/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ def _refresh_token(self) -> None:
expires_in = result.get("expires_in", self.expires_in)
if expires_in is None:
raise ValueError(
"The expiration time of the Token must be provided by the Server in the Access Token Response in `expires_in` field, or by the PyIceberg Client."
"The expiration time of the Token must be provided by the Server in the Access Token Response "
"in `expires_in` field, or by the PyIceberg Client."
)
self._expires_at = time.monotonic() + expires_in - self.refresh_margin

Expand Down Expand Up @@ -249,8 +250,9 @@ def auth_header(self) -> str:


class AuthManagerAdapter(AuthBase):
"""A `requests.auth.AuthBase` adapter that integrates an `AuthManager` into a `requests.Session` to automatically attach the appropriate Authorization header to every request.
"""A `requests.auth.AuthBase` adapter for integrating an `AuthManager` into a `requests.Session`.

This adapter automatically attaches the appropriate Authorization header to every request.
This adapter is useful when working with `requests.Session.auth`
and allows reuse of authentication strategies defined by `AuthManager`.
This AuthManagerAdapter is only intended to be used against the REST Catalog
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class SqlCatalog(MetastoreCatalog):
And you can have as many levels as you want, but you need at least one. The `SqlCatalog` honors the same convention.

In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an optional `Namespace` and a table name.
When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`. A valid `TableIdentifier` could be `'name'` (no namespace).
When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`.
A valid `TableIdentifier` could be `'name'` (no namespace).
The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`.
"""

Expand Down
5 changes: 3 additions & 2 deletions pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ def to_bytes(
) -> bytes:
"""Convert a built-in python value to bytes.

This conversion follows the serialization scheme for storing single values as individual binary values defined in the Iceberg specification that
can be found at https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
This conversion follows the serialization scheme for storing single values as individual binary values
defined in the Iceberg specification that can be found at
https://iceberg.apache.org/spec/#appendix-d-single-value-serialization

Args:
primitive_type (PrimitiveType): An implementation of the PrimitiveType base class.
Expand Down
12 changes: 8 additions & 4 deletions pyiceberg/expressions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,14 @@ def bind(self, schema: Schema, case_sensitive: bool = True) -> BoundSetPredicate
def __str__(self) -> str:
"""Return the string representation of the SetPredicate class."""
# Sort to make it deterministic
return f"{str(self.__class__.__name__)}({str(self.term)}, {{{', '.join(sorted([str(literal) for literal in self.literals]))}}})"
literals_str = ", ".join(sorted([str(literal) for literal in self.literals]))
return f"{str(self.__class__.__name__)}({str(self.term)}, {{{literals_str}}})"

def __repr__(self) -> str:
"""Return the string representation of the SetPredicate class."""
# Sort to make it deterministic
return f"{str(self.__class__.__name__)}({repr(self.term)}, {{{', '.join(sorted([repr(literal) for literal in self.literals]))}}})"
literals_repr = ", ".join(sorted([repr(literal) for literal in self.literals]))
return f"{str(self.__class__.__name__)}({repr(self.term)}, {{{literals_repr}}})"

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the SetPredicate class."""
Expand Down Expand Up @@ -725,12 +727,14 @@ def value_set(self) -> set[Any]:
def __str__(self) -> str:
"""Return the string representation of the BoundSetPredicate class."""
# Sort to make it deterministic
return f"{str(self.__class__.__name__)}({str(self.term)}, {{{', '.join(sorted([str(literal) for literal in self.literals]))}}})"
literals_str = ", ".join(sorted([str(literal) for literal in self.literals]))
return f"{str(self.__class__.__name__)}({str(self.term)}, {{{literals_str}}})"

def __repr__(self) -> str:
"""Return the string representation of the BoundSetPredicate class."""
# Sort to make it deterministic
return f"{str(self.__class__.__name__)}({repr(self.term)}, {{{', '.join(sorted([repr(literal) for literal in self.literals]))}}})"
literals_repr = ", ".join(sorted([repr(literal) for literal in self.literals]))
return f"{str(self.__class__.__name__)}({repr(self.term)}, {{{literals_repr}}})"

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the BoundSetPredicate class."""
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def visit(obj: BooleanExpression, visitor: BooleanExpressionVisitor[T]) -> T:

Args:
obj (BooleanExpression): An instance of a BooleanExpression.
visitor (BooleanExpressionVisitor[T]): An instance of an implementation of the generic BooleanExpressionVisitor base class.
visitor (BooleanExpressionVisitor[T]): An instance of an implementation of the generic
BooleanExpressionVisitor base class.

Raises:
NotImplementedError: If attempting to visit an unsupported expression.
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,5 +368,6 @@ def load_file_io(properties: Properties = EMPTY_DICT, location: str | None = Non
return PyArrowFileIO(properties)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
"Could not load a FileIO, please consider installing one: "
'pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
) from e
33 changes: 23 additions & 10 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.Na


class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.
"""A combined InputFile and OutputFile implementation using pyarrow filesystem.

This class generates pyarrow.lib.NativeFile instances.

Args:
location (str): A URI or a path to a local file.
Expand Down Expand Up @@ -645,8 +647,9 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.

Args:
location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
the location attribute for that instance is used as the location to delete.
location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or
an OutputFile instance is provided, the location attribute for that instance is used as
the location to delete.

Raises:
FileNotFoundError: When the file at the provided location does not exist.
Expand Down Expand Up @@ -1014,7 +1017,10 @@ def collect(
self,
expr: BooleanExpression,
) -> None:
"""Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining."""
"""Collect bound references categorized by null predicates.

Categorizes by having at least one is_null or is_not_null in the expr and the remaining.
"""
boolean_expression_visit(expr, self)


Expand All @@ -1035,7 +1041,8 @@ def expression_to_pyarrow(expr: BooleanExpression, schema: Schema | None = None)
def _expression_to_complementary_pyarrow(expr: BooleanExpression, schema: Schema | None = None) -> pc.Expression:
"""Complementary filter conversion function of expression_to_pyarrow.

Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null.
Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because
~ in pyarrow.compute.Expression does not handle null.
"""
collector = _NullNaNUnmentionedTermsCollector()
collector.collect(expr)
Expand Down Expand Up @@ -1417,7 +1424,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return TimestampNanoType()
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
"Iceberg does not yet support 'ns' timestamp precision. "
"Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically "
"downcast 'ns' to 'us' on write.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
Expand Down Expand Up @@ -1580,7 +1589,8 @@ def _task_to_record_batches(
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema

# For V1 and V2, we only support Timestamp 'us' in Iceberg Schema, therefore it is reasonable to always cast 'ns' timestamp to 'us' on read.
# For V1 and V2, we only support Timestamp 'us' in Iceberg Schema,
# therefore it is reasonable to always cast 'ns' timestamp to 'us' on read.
# For V3 this has to set explicitly to avoid nanosecond timestamp to be down-casted by default
downcast_ns_timestamp_to_us = (
downcast_ns_timestamp_to_us if downcast_ns_timestamp_to_us is not None else format_version <= 2
Expand Down Expand Up @@ -2441,7 +2451,8 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A

if not iceberg_transform.preserves_order:
raise ValueError(
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: "
f"{partition_field.name} with transform {partition_field.transform}"
)

transform_func = iceberg_transform.transform(source_field.field_type)
Expand All @@ -2462,7 +2473,8 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
)
if lower_value != upper_value:
raise ValueError(
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
f"Cannot infer partition value from parquet metadata as there are more than one partition values "
f"for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
)

return lower_value
Expand Down Expand Up @@ -2729,7 +2741,8 @@ def _check_pyarrow_schema_compatible(
)
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
raise ValueError(
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. "
"Update the schema first (hint, use union_by_name)."
) from e
_check_schema_compatible(requested_schema, provided_schema)

Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,8 @@ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
# To validate this, check that the snapshot id matches the current commit
if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
raise ValueError(
f"Found unassigned sequence number for a manifest from snapshot: {self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}"
f"Found unassigned sequence number for a manifest from snapshot: "
f"{self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}"
)
wrapped_manifest_file.sequence_number = self._sequence_number

Expand Down
27 changes: 20 additions & 7 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __str__(self) -> str:

def __repr__(self) -> str:
"""Return the string representation of the Schema class."""
return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"
columns_repr = ", ".join(repr(column) for column in self.columns)
return f"Schema({columns_repr}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"

def __len__(self) -> int:
"""Return the length of an instance of the Literal class."""
Expand Down Expand Up @@ -374,7 +375,8 @@ def check_format_version_compatibility(self, format_version: int) -> None:
for field in self._lazy_id_to_field.values():
if format_version < field.field_type.minimum_format_version():
raise ValueError(
f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}"
f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. "
f"Current format version is: {format_version}"
)


Expand Down Expand Up @@ -1530,7 +1532,8 @@ def field(self, field: NestedField, field_result: IcebergType | None) -> Iceberg
else:
if not field.field_type.is_primitive:
raise ValueError(
f"Cannot explicitly project List or Map types, {field.field_id}:{field.name} of type {field.field_type} was selected"
f"Cannot explicitly project List or Map types, "
f"{field.field_id}:{field.name} of type {field.field_type} was selected"
)
# Selected non-struct field
return field.field_type
Expand All @@ -1550,7 +1553,8 @@ def list(self, list_type: ListType, element_result: IcebergType | None) -> Icebe
else:
if not list_type.element_type.is_primitive:
raise ValueError(
f"Cannot explicitly project List or Map types, {list_type.element_id} of type {list_type.element_type} was selected"
f"Cannot explicitly project List or Map types, "
f"{list_type.element_id} of type {list_type.element_type} was selected"
)
return list_type
elif element_result is not None:
Expand All @@ -1567,7 +1571,8 @@ def map(self, map_type: MapType, key_result: IcebergType | None, value_result: I
return self._project_map(map_type, projected_struct)
if not map_type.value_type.is_primitive:
raise ValueError(
f"Cannot explicitly project List or Map types, Map value {map_type.value_id} of type {map_type.value_type} was selected"
f"Cannot explicitly project List or Map types, "
f"Map value {map_type.value_id} of type {map_type.value_type} was selected"
)
return map_type
elif value_result is not None:
Expand Down Expand Up @@ -1764,9 +1769,17 @@ def _is_field_compatible(self, lhs: NestedField) -> bool:
# UnknownType can only be promoted to Primitive types
if isinstance(rhs.field_type, UnknownType):
if not isinstance(lhs.field_type, PrimitiveType):
error_msg = f"Null type (UnknownType) cannot be promoted to non-primitive type {lhs.field_type}. UnknownType can only be promoted to primitive types (string, int, boolean, etc.) in V3+ tables."
error_msg = (
f"Null type (UnknownType) cannot be promoted to non-primitive type {lhs.field_type}. "
"UnknownType can only be promoted to primitive types (string, int, boolean, etc.) "
"in V3+ tables."
)
else:
error_msg = f"Null type (UnknownType) cannot be promoted to {lhs.field_type}. This may be due to table format version limitations (V1/V2 tables don't support UnknownType promotion)."
error_msg = (
f"Null type (UnknownType) cannot be promoted to {lhs.field_type}. "
"This may be due to table format version limitations "
"(V1/V2 tables don't support UnknownType promotion)."
)
self.rich_table.add_row("❌", str(lhs), f"{str(rhs)} - {error_msg}")
else:
self.rich_table.add_row("❌", str(lhs), str(rhs))
Expand Down
Loading