Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import (
Any,
Callable,
Dict,
Literal,
Optional,
Tuple,
Expand All @@ -31,6 +32,10 @@
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table.refs import SnapshotRef

DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1
DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000


def catch_exception() -> Callable: # type: ignore
Expand Down Expand Up @@ -372,3 +377,50 @@ def table(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}")


@run.command()
@click.argument("identifier")
@click.option("--type", required=False)
@click.option("--verbose", type=click.BOOL)
@click.pass_context
@catch_exception()
def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
"""List all the refs in the provided table."""
catalog, output = _catalog_and_output(ctx)
table = catalog.load_table(identifier)
refs = table.refs()
if type:
type = type.lower()
if type not in {"branch", "tag"}:
raise ValueError(f"Type must be either branch or tag, got: {type}")

relevant_refs = [
(ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties))
for (ref_name, ref) in refs.items()
if not type or ref.snapshot_ref_type == type
]

output.describe_refs(relevant_refs)


def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
retention_properties = {}
if ref.snapshot_ref_type == "branch":
default_min_snapshots_to_keep = table_properties.get(
"history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP
)
retention_properties["min_snapshots_to_keep"] = (
str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep)
)
default_max_snapshot_age_ms = table_properties.get("history.expire.max-snapshot-age-ms", DEFAULT_MAX_SNAPSHOT_AGE_MS)
retention_properties["max_snapshot_age_ms"] = (
str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms)
)
else:
retention_properties["min_snapshots_to_keep"] = "N/A"
retention_properties["max_snapshot_age_ms"] = "N/A"

retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever"

return retention_properties
35 changes: 34 additions & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
# under the License.
import json
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from uuid import UUID

from rich.console import Console
Expand All @@ -26,6 +32,7 @@
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, TableMetadata
from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties


Expand Down Expand Up @@ -72,6 +79,10 @@ def uuid(self, uuid: Optional[UUID]) -> None:
def version(self, version: str) -> None:
...

@abstractmethod
def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
...


class ConsoleOutput(Output):
"""Writes to the console."""
Expand Down Expand Up @@ -174,6 +185,19 @@ def uuid(self, uuid: Optional[UUID]) -> None:
def version(self, version: str) -> None:
Console().print(version)

def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
refs_table = RichTable(title="Snapshot Refs")
refs_table.add_column("Ref")
refs_table.add_column("Type")
refs_table.add_column("Max ref age ms")
refs_table.add_column("Min snapshots to keep")
refs_table.add_column("Max snapshot age ms")
for name, type, ref_detail in ref_details:
refs_table.add_row(
name, type, ref_detail["max_ref_age_ms"], ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"]
)
Console().print(refs_table)


class JsonOutput(Output):
"""Writes json to stdout."""
Expand Down Expand Up @@ -226,3 +250,12 @@ def uuid(self, uuid: Optional[UUID]) -> None:

def version(self, version: str) -> None:
self._out({"version": version})

def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
self._out(
[
{"name": name, "type": type, detail_key: detail_val}
for name, type, detail in refs
for detail_key, detail_val in detail.items()
]
)
5 changes: 5 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
visit,
)
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import SortOrder
from pyiceberg.typedef import (
Expand Down Expand Up @@ -569,6 +570,10 @@ def history(self) -> List[SnapshotLogEntry]:
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive)

def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
return self.metadata.refs

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
Expand Down