-
Notifications
You must be signed in to change notification settings - Fork 426
Construct a writer tree #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
For V1 and V2 there are some differences that are hard to enforce without this: - `1: snapshot_id` is required for V1, optional for V2 - `105: block_size_in_bytes` needs to be written for V1, but omitted for V2 (this leverages the `write-default`). - `3: sequence_number` and `4: file_sequence_number` can be omited for V1. Everything that we read, we map it to V2. However, when writing we also want to be compliant with the V1 spec, and this is where the writer tree comes in since we construct a tree for V1 or V2.
3e5ee2a to
ffa0748
Compare
HonahX
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the improvements!
pyiceberg/avro/resolver.py
Outdated
| if isinstance(write_field, NestedField) and write_field.write_default is not None: | ||
| # The field is not in the record, but there is a write default value | ||
| default_writer = DefaultWriter( | ||
| writer=visit(write_field.field_type, CONSTRUCT_WRITER_VISITOR), value=write_field.write_default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned by adding support for the write default. The problem is that we need to resolve the write default with child fields.
For example, if you have a field with type s struct<a int, b int> and default Struct(0, 0), there are changes that can make the default invalid. For example, ADD COLUMN s.c INT NOT NULL DEFAULT 0. That would probably add a new NestedField(name='c', field_type=IntType(), initial_default=0, write_default=0) but it doesn't necessarily update the write or initial default for s. Those defaults need to be resolved somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want to move forward with this, we should just make sure that the write default is a primitive somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, we can do that quite easily by saying that it only accepts an L that we already have in Typedef:
# Represents the literal value
L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, covariant=True)
# and in the nested-field:
write_default: Optional[L] = Field(alias="write-default", default=None, repr=False)I've also added a test:
with pytest.raises(pydantic_core.ValidationError) as exc_info:
_ = NestedField(
1,
"field",
StringType(),
required=True,
write_default=(1, "a", True)
)
assert "validation errors for NestedField" in str(exc_info.value)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reluctant to fix this with types since I think that in the long term we will support nested data. Seems like we should just have an assertion for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Just to clarify, the type annotation here is not just a hint, it will be enforced by Pydantic. If you pass in something other than what the type allows, it will raise a Pydantic ValidationError. An assertion would be similar (but then it would be done in Python land instead of Rust 🦀 ).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it. We'll have to change it later to be more permissive, but that sounds fine to me. Thanks!
pyiceberg/avro/resolver.py
Outdated
| Args: | ||
| struct_schema (Schema | IcebergType): The schema of the Avro file. | ||
| write_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the names and descriptions could be more clear. The description of struct_schema is good, but I think it should be file_schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, I think the names are still confusing here. When I see data_schema I would expect it to be the schema of the data that is being written. And also write_schema makes me think of Avro, where a "write schema" is typically the file schema.
Since this is a new concept (writing records into another schema by ignoring some columns), I think it would be good to have really clear names. I'd say file_schema and data_schema, but if you came to the opposite conclusion about the meaning of data_schema, then maybe we should have record_schema and file_schema? Or possibly inmemory_schema and file_schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this says "read schema" in the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this one. Thanks and this is very subjective :D
I think the names are still confusing here. When I see data_schema I would expect it to be the schema of the data that is being written.
For me, I would assume that the data_schema is in memory. record_schema and file_schema sounds the most natural to me.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the approach should work. There's a little issue with None and optional fields with no corresponding value in the data schema.
pyiceberg/avro/resolver.py
Outdated
| """ | ||
| if write_schema == data_schema: | ||
| return construct_writer(write_schema) | ||
| return visit_with_partner(write_schema, data_schema, WriteSchemaResolver(), SchemaPartnerAccessor()) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this reverse the order that write_schema and data_schema were passed in? Above it says the data_schema is the schema of the Avro file, but here it looks like it matches what I think makes sense, which is that it is the schema of the data in memory...
We should make sure we have consistency in the schema order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is because the arguments to the function, feel most natural from left to right. You have the data in some kind of schema, and you want to project that to some write schema.
pyiceberg/avro/resolver.py
Outdated
| # The field is not in the record, but there is a write default value | ||
| results.append((None, DefaultWriter(writer=writer, value=write_field.write_default))) # type: ignore | ||
| elif write_field.required: | ||
| raise ValueError(f"Field is required, and there is no write default: {write_field}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we were wrong to remove the last else branch here. If we get here then the file schema has a the field because it came from write_schema.fields. If we get here, there is no corresponding field in the data schema, there is no write default, and the field is not required. That means the field must be optional and not have a default. That means we need to call OptionWriter.write(null) to write the option's tag byte and select the null branch of the Union.
I think that case needs to produce (None, writer).
We should probably also make sure we have a test case for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right! This would apply to file_ordinal and sort_columns:

However, we don't write those. Updated the code and added a test-case:
def test_writer_missing_optional_in_read_schema() -> None:
actual = resolve_writer(
record_schema=Schema(),
file_schema=Schema(
NestedField(field_id=1, name="str", type=StringType(), required=False),
),
)
expected = StructWriter(field_writers=((None, OptionWriter(option=OptionWriter(option=StringWriter()))),))
assert actual == expected|
Forgot to push, just pushed the latest changes |
pyiceberg/avro/resolver.py
Outdated
| elif file_field.required: | ||
| raise ValueError(f"Field is required, and there is no write default: {file_field}") | ||
| else: | ||
| results.append((None, OptionWriter(option=writer))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't writer already be an OptionWriter? It was already processed by field which should create one if the field is not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right!
| Args: | ||
| record_schema (Schema | IcebergType): The schema of the record in memory. | ||
| file_schema (Schema | IcebergType): The schema of the file that will be written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
tests/avro/test_resolver.py
Outdated
| ), | ||
| ) | ||
|
|
||
| expected = StructWriter(field_writers=((None, OptionWriter(option=OptionWriter(option=StringWriter()))),)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the double option writer is suspicious!
|
I think overall this is ready. There's one bug where a missing record field produces a double Thanks for getting this working @Fokko! It's a clever way to support multiple format versions and I wouldn't be surprised if we ended up using it for the other language implementations as well! (Sorry I was slow to understand it and thanks for convincing me.) |
Definitely! Thanks for reviewing, and excited to get this in 🚀 |
For V1 and V2 there are some differences that are hard to enforce without this:
1: snapshot_idis required for V1, optional for V2105: block_size_in_bytesneeds to be written for V1, but omitted for V2 (this leverages thewrite-default).3: sequence_numberand4: file_sequence_numbercan be omited for V1.Everything that we read, we map it to V2. However, when writing we also want to be compliant with the V1 spec, and this is where the writer tree comes in since we construct a tree for V1 or V2.