From bfec3dafd2c03e793d890944e05d03be5b4d06c2 Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Thu, 27 Feb 2025 16:03:18 +0100 Subject: [PATCH 1/6] feat: Add write_parquet method to write files without committing This method allows users to write a PyArrow table to the table's storage format as Parquet files without committing them to the table. The method returns a list of file paths that were written, enabling workflows that require access to the data files before committing metadata changes. Also adds an include_field_ids parameter to the underlying write_file and _dataframe_to_data_files functions to provide more control over the Parquet writing process. --- pyiceberg/io/pyarrow.py | 8 +++++-- pyiceberg/table/__init__.py | 44 +++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7e3c7c082..6d5447ee68 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2353,7 +2353,8 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask], include_field_ids: bool = True + ) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) @@ -2380,7 +2381,7 @@ def write_parquet(task: WriteTask) -> DataFile: file_schema=task.schema, batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - include_field_ids=True, + include_field_ids=include_field_ids, ) for batch in task.record_batches ] @@ -2549,6 +2550,7 @@ def _dataframe_to_data_files( io: FileIO, write_uuid: Optional[uuid.UUID] = None, counter: Optional[itertools.count[int]] = None, + include_field_ids: bool = True ) -> Iterable[DataFile]: """Convert a PyArrow table into a DataFile. @@ -2578,6 +2580,7 @@ def _dataframe_to_data_files( for batches in bin_pack_arrow_table(df, target_file_size) ] ), + include_field_ids=include_field_ids ) else: partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) @@ -2597,6 +2600,7 @@ def _dataframe_to_data_files( for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size) ] ), + include_field_ids=include_field_ids ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 679f74d107..7c3271c50a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1226,6 +1226,50 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties) + def write_parquet(self, df: pa.Table) -> List[str]: + """ + Shorthand API for writing a PyArrow table as Parquet files for the table. + Writes data files but does not commit them to the table. + + Args: + df: The Arrow table that will be written as Parquet files + + Returns: + List of file paths to the written Parquet files + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + if unsupported_partitions := [ + field for field in self.metadata.spec().fields if not field.transform.supports_pyarrow_transform + ]: + raise ValueError( + f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." + ) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.metadata.schema(), provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) + + if df.shape[0] > 0: + data_files = list( + _dataframe_to_data_files( + table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, + io=self.io + ) + ) + + return [data_file.file_path for data_file in data_files] + + def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """Shorthand for dynamic overwriting the table with a PyArrow table. From 6fc83bebeb256b2719684af52103ecdddf32e5d1 Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Thu, 27 Feb 2025 16:24:36 +0100 Subject: [PATCH 2/6] feat: Add write_parquet method to write files without committing This method allows users to write a PyArrow table to the table's storage format as Parquet files without committing them to the table. The method returns a list of file paths that were written, enabling workflows that require access to the data files before committing metadata changes. Also adds an include_field_ids parameter to the underlying write_file and _dataframe_to_data_files functions to provide more control over the Parquet writing process. --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7c3271c50a..95b5b68b6a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1262,7 +1262,7 @@ def write_parquet(self, df: pa.Table) -> List[str]: if df.shape[0] > 0: data_files = list( _dataframe_to_data_files( - table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, + table_metadata=self.metadata, write_uuid=append_files.commit_uuid, df=df, io=self.io ) ) From 2683984d144171d597a8596b639fa0ba265aa4cf Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Thu, 27 Feb 2025 20:20:21 +0100 Subject: [PATCH 3/6] feat: Add write_parquet method to write files without committing This method allows users to write a PyArrow table to the table's storage format as Parquet files without committing them to the table. The method returns a list of file paths that were written, enabling workflows that require access to the data files before committing metadata changes. Also adds an include_field_ids parameter to the underlying write_file and _dataframe_to_data_files functions to provide more control over the Parquet writing process. --- pyiceberg/table/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 95b5b68b6a..fb6fda0a3d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1262,8 +1262,7 @@ def write_parquet(self, df: pa.Table) -> List[str]: if df.shape[0] > 0: data_files = list( _dataframe_to_data_files( - table_metadata=self.metadata, write_uuid=append_files.commit_uuid, df=df, - io=self.io + table_metadata=self.metadata, write_uuid=uuid.uuid4(), df=df, io=self.io ) ) From 62b62dc180138e37545231498800b268fe2935f8 Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Thu, 27 Feb 2025 20:35:30 +0100 Subject: [PATCH 4/6] feat: Add write_parquet method to write files without committing This method allows users to write a PyArrow table to the table's storage format as Parquet files without committing them to the table. The method returns a list of file paths that were written, enabling workflows that require access to the data files before committing metadata changes. Also adds an include_field_ids parameter to the underlying write_file and _dataframe_to_data_files functions to provide more control over the Parquet writing process. --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index fb6fda0a3d..e6068b02bf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1262,7 +1262,7 @@ def write_parquet(self, df: pa.Table) -> List[str]: if df.shape[0] > 0: data_files = list( _dataframe_to_data_files( - table_metadata=self.metadata, write_uuid=uuid.uuid4(), df=df, io=self.io + table_metadata=self.metadata, write_uuid=uuid.uuid4(), df=df, io=self.io, include_field_ids=False ) ) From 78a4e7597e899b159623782de62d2f5f60b2585c Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Thu, 27 Feb 2025 21:44:06 +0100 Subject: [PATCH 5/6] feat: Add write_parquet method to write files without committing This method allows users to write a PyArrow table to the table's storage format as Parquet files without committing them to the table. The method returns a list of file paths that were written, enabling workflows that require access to the data files before committing metadata changes. Also adds an include_field_ids parameter to the underlying write_file and _dataframe_to_data_files functions to provide more control over the Parquet writing process. --- pyiceberg/io/pyarrow.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6d5447ee68..1fa4848484 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2205,24 +2205,25 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}" ) - lower_value = partition_record_value( + source_field = schema.find_field(partition_field.source_id) + transform = partition_field.transform.transform(source_field.field_type) + + lower_value = transform(partition_record_value( partition_field=partition_field, value=self.column_aggregates[partition_field.source_id].current_min, schema=schema, - ) - upper_value = partition_record_value( + )) + upper_value = transform(partition_record_value( partition_field=partition_field, value=self.column_aggregates[partition_field.source_id].current_max, schema=schema, - ) + )) if lower_value != upper_value: raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - source_field = schema.find_field(partition_field.source_id) - transform = partition_field.transform.transform(source_field.field_type) - return transform(lower_value) + return lower_value def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) From d3be7897cfd32c1031ae02ae1724150bc8968e63 Mon Sep 17 00:00:00 2001 From: Andor Markus Date: Fri, 28 Feb 2025 09:28:27 +0100 Subject: [PATCH 6/6] feat: Adding documentation --- mkdocs/docs/api.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 7978fdc9b4..3408d5be62 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -261,6 +261,8 @@ schema = Schema( tbl = catalog.create_table("default.cities", schema=schema) ``` +### Append and Overwrite + Now write the data to the table: @@ -333,6 +335,49 @@ df = pa.Table.from_pylist( table.append(df) ``` +### Write Parquet Files + +PyIceberg provides a low-level API to write Parquet files in Iceberg-compatible format without committing them to the table metadata. This is useful when you need more control over the commit process: + +```python +file_paths = tbl.write_parquet(df) +``` + +The `write_parquet()` method takes a PyArrow table and writes it to Parquet files following the table's schema and partitioning, returning the paths of the written files: + +```python +import pyarrow as pa + +df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], +) + +# Write files but don't commit them +file_paths = tbl.write_parquet(df) +print(file_paths) +# ['s3a://warehouse/default/cities/data/00000-0-8e056d57-7ffa-4c22-9f99-52a0e5ea4b19.parquet'] + +# Files written but not committed - won't appear in queries until committed +``` + +To make these files visible when querying the table, you need to commit them using the [`add_files`](#add-files) API: + + +```python +# Commit the files to the table metadata +tbl.add_files(file_paths=file_paths) + +# Now the data is visible when querying the table +``` + +### Delete + + You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. ```python @@ -1055,6 +1100,19 @@ tbl.add_files(file_paths=file_paths) # A new snapshot is committed to the table with manifests pointing to the existing parquet files ``` +The `write_parquet()` method provides an easy way to write files in Iceberg-compatible format that can then be committed using `add_files`: + +```python +# Write data to parquet files without committing +file_paths = tbl.write_parquet(df) + +# Commit the files to make them visible in queries +tbl.add_files(file_paths=file_paths) +``` + +This is very useful for detaching the commit process when ingesting data into an Iceberg table with high concurrency, such as using serverless functions. By separating the write and commit phases, you can implement a queue or orchestration system to handle the concurrency lock only during the commit process, which is typically much faster than the data writing phase. + + !!! note "Name Mapping"