Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime
import itertools
import uuid
import warnings
from abc import ABC, abstractmethod
from copy import copy
from dataclasses import dataclass
Expand Down Expand Up @@ -942,15 +943,23 @@ def snapshot(self) -> Optional[Snapshot]:
return self.table.current_snapshot()

def projection(self) -> Schema:
snapshot_schema = self.table.schema()
if snapshot := self.snapshot():
if snapshot.schema_id is not None:
snapshot_schema = self.table.schemas()[snapshot.schema_id]
current_schema = self.table.schema()
if self.snapshot_id is not None:
snapshot = self.table.snapshot_by_id(self.snapshot_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be an invalid state if snapshot is None but a snapshot_id is set, should we throw?

Maybe we could consider a schema_for(snapshot_id) API similar to https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L368 .

I think there's a difference in the Java implementation and Python implementation on the case where there is a schema ID on the snapshot but for whatever reason the schema with that ID cannot be found. In the schemaFor Java API implementation we throw, but here we fall back to the latest. I think we should probably throw rather than assume the latest in that case because that implies there is some bad metadata and it's safer to fail than coerce to the latest schema. I think latest should only be used when there is no schema ID on the snapshot and the original case when there is no snapshot_id set. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch @amogh-jahagirdar I'm not super strong on this one. Typically, I would not fail in these situations, but I agree that raising a warning might be appropriate here.

I know there are thoughts of pruning old schemas, which might lead to this situation, but I would expect this to happen regularly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code with a warning, let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the warning makes sense for the missing schema ID case but what about the case where the snapshot_id is set but cannot be found (if line 948 returns None)? I think the only option there would be to throw because that means there was some established snapshot_id but we can't find it anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, that's a good one. I think we should check if the snapshot-id is valid earlier in the process. I've added a check now, but I'll follow up with another PR to make this more strict.

if snapshot is not None:
if snapshot.schema_id is not None:
snapshot_schema = self.table.schemas().get(snapshot.schema_id)
if snapshot_schema is not None:
current_schema = snapshot_schema
else:
warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
else:
raise ValueError(f"Snapshot not found: {self.snapshot_id}")

if "*" in self.selected_fields:
return snapshot_schema
return current_schema

return snapshot_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

@abstractmethod
def plan_files(self) -> Iterable[ScanTask]:
Expand Down
89 changes: 88 additions & 1 deletion tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import pytest
from sortedcontainers import SortedList

from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.exceptions import CommitFailedException
from pyiceberg.expressions import (
AlwaysTrue,
And,
EqualTo,
In,
)
from pyiceberg.io import PY_IO_IMPL
from pyiceberg.io import PY_IO_IMPL, load_file_io
from pyiceberg.manifest import (
DataFile,
DataFileContent,
Expand Down Expand Up @@ -848,3 +849,89 @@ def test_assert_default_sort_order_id(table_v2: Table) -> None:
match="Requirement failed: default sort order id has changed: expected 1, found 3",
):
AssertDefaultSortOrderId(default_sort_order_id=1).validate(base_metadata)


def test_correct_schema() -> None:
table_metadata = TableMetadataV2(
**{
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "s3://bucket/test/location",
"last-sequence-number": 34,
"last-updated-ms": 1602638573590,
"last-column-id": 3,
"current-schema-id": 1,
"schemas": [
{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]},
{
"type": "struct",
"schema-id": 1,
"identifier-field-ids": [1, 2],
"fields": [
{"id": 1, "name": "x", "required": True, "type": "long"},
{"id": 2, "name": "y", "required": True, "type": "long"},
{"id": 3, "name": "z", "required": True, "type": "long"},
],
},
],
"default-spec-id": 0,
"partition-specs": [
{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}
],
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [],
"current-snapshot-id": 123,
"snapshots": [
{
"snapshot-id": 234,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
"schema-id": 10,
},
{
"snapshot-id": 123,
"timestamp-ms": 1515100955770,
"sequence-number": 0,
"summary": {"operation": "append"},
"manifest-list": "s3://a/b/1.avro",
"schema-id": 0,
},
],
}
)

t = Table(
identifier=("default", "t1"),
metadata=table_metadata,
metadata_location="s3://../..",
io=load_file_io(),
catalog=NoopCatalog("NoopCatalog"),
)

# Should use the current schema, instead the one from the snapshot
assert t.scan().projection() == Schema(
NestedField(field_id=1, name='x', field_type=LongType(), required=True),
NestedField(field_id=2, name='y', field_type=LongType(), required=True),
NestedField(field_id=3, name='z', field_type=LongType(), required=True),
schema_id=1,
identifier_field_ids=[1, 2],
)

# When we explicitly filter on the commit, we want to have the schema that's linked to the snapshot
assert t.scan(snapshot_id=123).projection() == Schema(
NestedField(field_id=1, name='x', field_type=LongType(), required=True),
schema_id=0,
identifier_field_ids=[],
)

with pytest.warns(UserWarning, match="Metadata does not contain schema with id: 10"):
t.scan(snapshot_id=234).projection()

# Invalid snapshot
with pytest.raises(ValueError) as exc_info:
_ = t.scan(snapshot_id=-1).projection()

assert "Snapshot not found: -1" in str(exc_info.value)