From 84014936d05f213cc6e2972cd680adf7fe2d92de Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 27 Feb 2023 09:45:42 +0100 Subject: [PATCH] Python: Fix timezone concat issue Resolves #6945 --- python/pyiceberg/io/pyarrow.py | 31 +++++++++++++++++++------------ python/tests/io/test_pyarrow.py | 2 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py index a064ae3bdfdf..46e384bc171c 100644 --- a/python/pyiceberg/io/pyarrow.py +++ b/python/pyiceberg/io/pyarrow.py @@ -391,7 +391,7 @@ def visit_timestamp(self, _: TimestampType) -> pa.DataType: return pa.timestamp(unit="us") def visit_timestampz(self, _: TimestamptzType) -> pa.DataType: - return pa.timestamp(unit="us", tz="+00:00") + return pa.timestamp(unit="us", tz="UTC") def visit_string(self, _: StringType) -> pa.DataType: return pa.string() @@ -477,7 +477,7 @@ def _file_to_table( projected_schema: Schema, projected_field_ids: Set[int], case_sensitive: bool, -) -> pa.Table: +) -> Optional[pa.Table]: _, path = PyArrowFileIO.parse_location(task.file.file_path) # Get the schema @@ -512,10 +512,11 @@ def _file_to_table( columns=[col.name for col in file_project_schema.columns], ) - if pyarrow_filter is not None: - arrow_table = arrow_table.filter(pyarrow_filter) - - return to_requested_schema(projected_schema, file_project_schema, arrow_table) + # If there is no data, we don't have to go through the schema + if len(arrow_table) > 0: + return to_requested_schema(projected_schema, file_project_schema, arrow_table) + else: + return None def project_table( @@ -547,16 +548,22 @@ def project_table( }.union(extract_field_ids(bound_row_filter)) with ThreadPool() as pool: - tables = pool.starmap( - func=_file_to_table, - iterable=[(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive) for task in tasks], - chunksize=None, # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy) - ) + tables = [ + table + for table in pool.starmap( + func=_file_to_table, + iterable=[(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive) for task in tasks], + chunksize=None, # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy) + ) + if table is not None + ] if len(tables) > 1: return pa.concat_tables(tables) - else: + elif len(tables) == 1: return tables[0] + else: + return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema)) def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py index f894963d9421..c07890053dc6 100644 --- a/python/tests/io/test_pyarrow.py +++ b/python/tests/io/test_pyarrow.py @@ -377,7 +377,7 @@ def test_timestamp_type_to_pyarrow() -> None: def test_timestamptz_type_to_pyarrow() -> None: iceberg_type = TimestamptzType() - assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="+00:00") + assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="UTC") def test_string_type_to_pyarrow() -> None: