Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ingestify/domain/models/resources/batch_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ def load(file_resources, current_files, dataset_resources):

current_files may contain None entries (for create tasks) or a File (for
update tasks) — the loader_fn handles both.

Error propagation: if the loader_fn stores an Exception instead of a
DraftFile for a specific item, __call__ will re-raise it when that item's
task runs. This lets successful items in a batch proceed while failed items
cause their individual tasks to fail and be retried on the next run.
"""
from typing import Callable, List

Expand All @@ -35,7 +40,10 @@ def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs)
"BatchLoader result not precomputed. A BatchTask must "
"populate the cache before inner tasks execute."
)
return self._results.pop(key)
result = self._results.pop(key)
if isinstance(result, BaseException):
raise result
return result

def _store_results(self, file_resources: List, results: List):
"""Store batch results so they can be retrieved via __call__."""
Expand Down
16 changes: 16 additions & 0 deletions ingestify/tests/test_batch_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ def test_batch_loader_raises_if_not_precomputed():

with pytest.raises(RuntimeError, match="not precomputed"):
batch_loader(object(), current_file=None)


def test_batch_loader_propagates_stored_exception():
"""When an Exception is stored as a result, __call__ re-raises it."""
batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5)
fr_ok, fr_err = object(), object()

original_error = ValueError("Google Ads daily quota exhausted")
batch_loader._store_results([fr_ok, fr_err], ["good_result", original_error])

# Good result works normally
assert batch_loader(fr_ok, current_file=None) == "good_result"

# Error result re-raises the original exception
with pytest.raises(ValueError, match="daily quota exhausted"):
batch_loader(fr_err, current_file=None)
Loading