diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 62f7a02fab..bd1164f599 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -19,6 +19,7 @@ from typing import ( Any, Callable, + Dict, Literal, Optional, Tuple, @@ -31,6 +32,10 @@ from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError +from pyiceberg.table.refs import SnapshotRef + +DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1 +DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000 def catch_exception() -> Callable: # type: ignore @@ -372,3 +377,50 @@ def table(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F ctx.exit(1) else: raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}") + + +@run.command() +@click.argument("identifier") +@click.option("--type", required=False) +@click.option("--verbose", type=click.BOOL) +@click.pass_context +@catch_exception() +def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: + """List all the refs in the provided table.""" + catalog, output = _catalog_and_output(ctx) + table = catalog.load_table(identifier) + refs = table.refs() + if type: + type = type.lower() + if type not in {"branch", "tag"}: + raise ValueError(f"Type must be either branch or tag, got: {type}") + + relevant_refs = [ + (ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties)) + for (ref_name, ref) in refs.items() + if not type or ref.snapshot_ref_type == type + ] + + output.describe_refs(relevant_refs) + + +def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: + retention_properties = {} + if ref.snapshot_ref_type == "branch": + default_min_snapshots_to_keep = table_properties.get( + "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP + ) + retention_properties["min_snapshots_to_keep"] = ( + str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep) + ) + default_max_snapshot_age_ms = table_properties.get("history.expire.max-snapshot-age-ms", DEFAULT_MAX_SNAPSHOT_AGE_MS) + retention_properties["max_snapshot_age_ms"] = ( + str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms) + ) + else: + retention_properties["min_snapshots_to_keep"] = "N/A" + retention_properties["max_snapshot_age_ms"] = "N/A" + + retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever" + + return retention_properties diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 299f84dafe..8e0ad2deee 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -16,7 +16,13 @@ # under the License. import json from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) from uuid import UUID from rich.console import Console @@ -26,6 +32,7 @@ from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table, TableMetadata +from pyiceberg.table.refs import SnapshotRefType from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties @@ -72,6 +79,10 @@ def uuid(self, uuid: Optional[UUID]) -> None: def version(self, version: str) -> None: ... + @abstractmethod + def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: + ... + class ConsoleOutput(Output): """Writes to the console.""" @@ -174,6 +185,19 @@ def uuid(self, uuid: Optional[UUID]) -> None: def version(self, version: str) -> None: Console().print(version) + def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: + refs_table = RichTable(title="Snapshot Refs") + refs_table.add_column("Ref") + refs_table.add_column("Type") + refs_table.add_column("Max ref age ms") + refs_table.add_column("Min snapshots to keep") + refs_table.add_column("Max snapshot age ms") + for name, type, ref_detail in ref_details: + refs_table.add_row( + name, type, ref_detail["max_ref_age_ms"], ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"] + ) + Console().print(refs_table) + class JsonOutput(Output): """Writes json to stdout.""" @@ -226,3 +250,12 @@ def uuid(self, uuid: Optional[UUID]) -> None: def version(self, version: str) -> None: self._out({"version": version}) + + def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: + self._out( + [ + {"name": name, "type": type, detail_key: detail_val} + for name, type, detail in refs + for detail_key, detail_val in detail.items() + ] + ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 21a3b7fc23..b0f2bcd1c4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -70,6 +70,7 @@ visit, ) from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata +from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry from pyiceberg.table.sorting import SortOrder from pyiceberg.typedef import ( @@ -569,6 +570,10 @@ def history(self) -> List[SnapshotLogEntry]: def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive) + def refs(self) -> Dict[str, SnapshotRef]: + """Return the snapshot references in the table.""" + return self.metadata.refs + def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog._commit_table( # pylint: disable=W0212 CommitTableRequest(