Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import operator
import os
import re
import time
import uuid
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -1575,11 +1576,16 @@ def _task_to_record_batches(
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
downcast_ns_timestamp_to_us: bool | None = None,
) -> Iterator[pa.RecordBatch]:
task_start = time.perf_counter()

open_start = time.perf_counter()
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
open_end = time.perf_counter()

schema_start = time.perf_counter()
# For V1 and V2, we only support Timestamp 'us' in Iceberg Schema, therefore it is reasonable to always cast 'ns' timestamp to 'us' on read.
# For V3 this has to set explicitly to avoid nanosecond timestamp to be down-casted by default
downcast_ns_timestamp_to_us = (
Expand All @@ -1593,14 +1599,31 @@ def _task_to_record_batches(
projected_missing_fields = _get_column_projection_values(
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
)
schema_end = time.perf_counter()

filter_start = time.perf_counter()
pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
translate_start = time.perf_counter()
translated_row_filter = translate_column_names(
bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields
)
translate_end = time.perf_counter()

bind_start = time.perf_counter()
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
bind_end = time.perf_counter()

to_pyarrow_start = time.perf_counter()
pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema)
to_pyarrow_end = time.perf_counter()

logger.info(
"[SCAN TIMING] filter_prep breakdown: (translate: %.4fs, bind: %.4fs, to_pyarrow: %.4fs)",
translate_end - translate_start,
bind_end - bind_start,
to_pyarrow_end - to_pyarrow_start,
)

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

Expand All @@ -1612,9 +1635,24 @@ def _task_to_record_batches(
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
filter_end = time.perf_counter()

batch_read_start = time.perf_counter()
batches = list(fragment_scanner.to_batches())
batch_read_end = time.perf_counter()

logger.info(
"[SCAN TIMING] _task_to_record_batches %s: (open: %.4fs, schema: %.4fs, filter_prep: %.4fs, batch_read: %.4fs, batches: %d, total: %.4fs)",
task.file.file_path,
open_end - open_start,
schema_end - schema_start,
filter_end - filter_start,
batch_read_end - batch_read_start,
len(batches),
time.perf_counter() - task_start,
)

next_index = 0
batches = fragment_scanner.to_batches()
for batch in batches:
next_index = next_index + len(batch)
current_index = next_index - len(batch)
Expand Down Expand Up @@ -1650,9 +1688,14 @@ def _task_to_record_batches(


def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[str, list[ChunkedArray]]:
func_start = time.perf_counter()

deletes_per_file: dict[str, list[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
collect_end = time.perf_counter()

if len(unique_deletes) > 0:
read_start = time.perf_counter()
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
Expand All @@ -1664,6 +1707,20 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
deletes_per_file[file].append(arr)
else:
deletes_per_file[file] = [arr]
read_end = time.perf_counter()

logger.info(
"[SCAN TIMING] _read_all_delete_files: %.4fs (collect: %.4fs, read: %.4fs, unique_delete_files: %d)",
time.perf_counter() - func_start,
collect_end - func_start,
read_end - read_start,
len(unique_deletes),
)
else:
logger.info(
"[SCAN TIMING] _read_all_delete_files: %.4fs (no deletes)",
time.perf_counter() - func_start,
)

return deletes_per_file

Expand Down Expand Up @@ -1773,7 +1830,21 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)
materialize_start = time.perf_counter()
tasks_list = list(tasks) # Force materialization for delete file collection
materialize_end = time.perf_counter()

delete_read_start = time.perf_counter()
deletes_per_file = _read_all_delete_files(self._io, tasks_list)
delete_read_end = time.perf_counter()

logger.info(
"[SCAN TIMING] to_record_batches setup: (task_materialize: %.4fs, delete_read: %.4fs, tasks: %d, delete_files: %d)",
materialize_end - materialize_start,
delete_read_end - delete_read_start,
len(tasks_list),
len(deletes_per_file),
)

total_row_count = 0
executor = ExecutorFactory.get_or_create()
Expand All @@ -1785,7 +1856,7 @@ def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for batches in executor.map(batches_for_task, tasks_list):
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
Expand Down
Loading