diff --git a/ingestify/domain/models/resources/batch_loader.py b/ingestify/domain/models/resources/batch_loader.py index 6971df5..fa8f20f 100644 --- a/ingestify/domain/models/resources/batch_loader.py +++ b/ingestify/domain/models/resources/batch_loader.py @@ -18,6 +18,11 @@ def load(file_resources, current_files, dataset_resources): current_files may contain None entries (for create tasks) or a File (for update tasks) — the loader_fn handles both. + +Error propagation: if the loader_fn stores an Exception instead of a +DraftFile for a specific item, __call__ will re-raise it when that item's +task runs. This lets successful items in a batch proceed while failed items +cause their individual tasks to fail and be retried on the next run. """ from typing import Callable, List @@ -35,7 +40,10 @@ def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs) "BatchLoader result not precomputed. A BatchTask must " "populate the cache before inner tasks execute." ) - return self._results.pop(key) + result = self._results.pop(key) + if isinstance(result, BaseException): + raise result + return result def _store_results(self, file_resources: List, results: List): """Store batch results so they can be retrieved via __call__.""" diff --git a/ingestify/tests/test_batch_loader.py b/ingestify/tests/test_batch_loader.py index c7aef4d..ef9b706 100644 --- a/ingestify/tests/test_batch_loader.py +++ b/ingestify/tests/test_batch_loader.py @@ -21,3 +21,19 @@ def test_batch_loader_raises_if_not_precomputed(): with pytest.raises(RuntimeError, match="not precomputed"): batch_loader(object(), current_file=None) + + +def test_batch_loader_propagates_stored_exception(): + """When an Exception is stored as a result, __call__ re-raises it.""" + batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5) + fr_ok, fr_err = object(), object() + + original_error = ValueError("Google Ads daily quota exhausted") + batch_loader._store_results([fr_ok, fr_err], ["good_result", original_error]) + + # Good result works normally + assert batch_loader(fr_ok, current_file=None) == "good_result" + + # Error result re-raises the original exception + with pytest.raises(ValueError, match="daily quota exhausted"): + batch_loader(fr_err, current_file=None)