diff --git a/mapillary_tools/history.py b/mapillary_tools/history.py index 8b7251ec..a0cf1311 100644 --- a/mapillary_tools/history.py +++ b/mapillary_tools/history.py @@ -162,6 +162,11 @@ def clear_expired(self) -> list[str]: return expired_keys + def keys(self): + with self._lock: + with dbm.open(self._file, flag="c") as db: + return db.keys() + def _is_expired(self, payload: JSONDict) -> bool: expires_at = payload.get("expires_at") if isinstance(expires_at, (int, float)): diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 8b2e2ad3..c81c91c4 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -2,6 +2,7 @@ import concurrent.futures import dataclasses +import hashlib import io import json import logging @@ -56,6 +57,9 @@ class UploadOptions: user_items: config.UserItem chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024) num_upload_workers: int = constants.MAX_IMAGE_UPLOAD_WORKERS + # When set, upload cache will be read/write there + # This option is exposed for testing purpose. In PROD, the path is calculated based on envvar and user_items + upload_cache_path: Path | None = None dry_run: bool = False nofinish: bool = False noresume: bool = False @@ -471,7 +475,7 @@ def _zip_sequence_fp( # Arcname should be unique, the name does not matter arcname = f"{idx}.jpg" zipinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) - zipf.writestr(zipinfo, SingleImageUploader.dump_image_bytes(metadata)) + zipf.writestr(zipinfo, CachedImageUploader.dump_image_bytes(metadata)) assert len(sequence) == len(set(zipf.namelist())) zipf.comment = json.dumps( {"sequence_md5sum": sequence_md5sum}, @@ -537,6 +541,13 @@ class ImageSequenceUploader: def __init__(self, upload_options: UploadOptions, emitter: EventEmitter): self.upload_options = upload_options self.emitter = emitter + # Create a single shared SingleImageUploader instance that will be used across all uploads + cache = _maybe_create_persistent_cache_instance(self.upload_options) + if cache: + cache.clear_expired() + self.cached_image_uploader = CachedImageUploader( + self.upload_options, cache=cache + ) def upload_images( self, image_metadatas: T.Sequence[types.ImageMetadata] @@ -688,10 +699,6 @@ def _upload_images_from_queue( with api_v4.create_user_session( self.upload_options.user_items["user_upload_token"] ) as user_session: - single_image_uploader = SingleImageUploader( - self.upload_options, user_session=user_session - ) - while True: # Assert that all images are already pushed into the queue try: @@ -710,8 +717,8 @@ def _upload_images_from_queue( } # image_progress will be updated during uploading - file_handle = single_image_uploader.upload( - image_metadata, image_progress + file_handle = self.cached_image_uploader.upload( + user_session, image_metadata, image_progress ) # Update chunk_size (it was constant if set) @@ -731,24 +738,27 @@ def _upload_images_from_queue( return indexed_file_handles -class SingleImageUploader: +class CachedImageUploader: def __init__( self, upload_options: UploadOptions, - user_session: requests.Session | None = None, + cache: history.PersistentCache | None = None, ): self.upload_options = upload_options - self.user_session = user_session - self.cache = self._maybe_create_persistent_cache_instance( - self.upload_options.user_items, upload_options - ) + self.cache = cache + if self.cache: + self.cache.clear_expired() + # Thread-safe def upload( - self, image_metadata: types.ImageMetadata, image_progress: dict[str, T.Any] + self, + user_session: requests.Session, + image_metadata: types.ImageMetadata, + image_progress: dict[str, T.Any], ) -> str: image_bytes = self.dump_image_bytes(image_metadata) - uploader = Uploader(self.upload_options, user_session=self.user_session) + uploader = Uploader(self.upload_options, user_session=user_session) session_key = uploader._gen_session_key(io.BytesIO(image_bytes), image_progress) @@ -786,51 +796,7 @@ def dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes: f"Failed to dump EXIF bytes: {ex}", metadata.filename ) from ex - @classmethod - def _maybe_create_persistent_cache_instance( - cls, user_items: config.UserItem, upload_options: UploadOptions - ) -> history.PersistentCache | None: - if not constants.UPLOAD_CACHE_DIR: - LOG.debug( - "Upload cache directory is set empty, skipping caching upload file handles" - ) - return None - - if upload_options.dry_run: - LOG.debug("Dry-run mode enabled, skipping caching upload file handles") - return None - - # Different python/CLI versions use different cache (dbm) formats. - # Separate them to avoid conflicts - py_version_parts = [str(part) for part in sys.version_info[:3]] - version = f"py_{'_'.join(py_version_parts)}_{VERSION}" - - cache_path_dir = ( - Path(constants.UPLOAD_CACHE_DIR) - .joinpath(version) - .joinpath(api_v4.MAPILLARY_CLIENT_TOKEN.replace("|", "_")) - .joinpath( - user_items.get("MAPSettingsUserKey", user_items["user_upload_token"]) - ) - ) - cache_path_dir.mkdir(parents=True, exist_ok=True) - cache_path = cache_path_dir.joinpath("cached_file_handles") - - # Sanitize sensitive segments for logging - sanitized_cache_path = ( - Path(constants.UPLOAD_CACHE_DIR) - .joinpath(version) - .joinpath("***") - .joinpath("***") - .joinpath("cached_file_handles") - ) - LOG.debug(f"File handle cache path: {sanitized_cache_path}") - - cache = history.PersistentCache(str(cache_path.resolve())) - cache.clear_expired() - - return cache - + # Thread-safe def _get_cached_file_handle(self, key: str) -> str | None: if self.cache is None: return None @@ -840,6 +806,7 @@ def _get_cached_file_handle(self, key: str) -> str | None: return self.cache.get(key) + # Thread-safe def _set_file_handle_cache(self, key: str, value: str) -> None: if self.cache is None: return @@ -1168,3 +1135,57 @@ def _prefixed_uuid4(): def _is_uuid(key: str) -> bool: return key.startswith("uuid_") or key.startswith("mly_tools_uuid_") + + +def _build_upload_cache_path(upload_options: UploadOptions) -> Path: + # Different python/CLI versions use different cache (dbm) formats. + # Separate them to avoid conflicts + py_version_parts = [str(part) for part in sys.version_info[:3]] + version = f"py_{'_'.join(py_version_parts)}_{VERSION}" + # File handles are not sharable between different users + user_id = str( + upload_options.user_items.get( + "MAPSettingsUserKey", upload_options.user_items["user_upload_token"] + ) + ) + # Use hash to avoid log sensitive data + user_fingerprint = utils.md5sum_fp( + io.BytesIO((api_v4.MAPILLARY_CLIENT_TOKEN + user_id).encode("utf-8")), + md5=hashlib.sha256(), + ).hexdigest()[:24] + + cache_path = ( + Path(constants.UPLOAD_CACHE_DIR) + .joinpath(version) + .joinpath(user_fingerprint) + .joinpath("cached_file_handles") + ) + + return cache_path + + +def _maybe_create_persistent_cache_instance( + upload_options: UploadOptions, +) -> history.PersistentCache | None: + """Create a persistent cache instance if caching is enabled.""" + + if upload_options.dry_run: + LOG.debug("Dry-run mode enabled, skipping caching upload file handles") + return None + + if upload_options.upload_cache_path is None: + if not constants.UPLOAD_CACHE_DIR: + LOG.debug( + "Upload cache directory is set empty, skipping caching upload file handles" + ) + return None + + cache_path = _build_upload_cache_path(upload_options) + else: + cache_path = upload_options.upload_cache_path + + LOG.debug(f"File handle cache path: {cache_path}") + + cache_path.parent.mkdir(parents=True, exist_ok=True) + + return history.PersistentCache(str(cache_path.resolve())) diff --git a/tests/unit/test_uploader.py b/tests/unit/test_uploader.py index c4a9bbcb..fa2f3a72 100644 --- a/tests/unit/test_uploader.py +++ b/tests/unit/test_uploader.py @@ -1,8 +1,9 @@ +import dataclasses import typing as T from pathlib import Path +from unittest.mock import patch import py.path - import pytest from mapillary_tools import api_v4, uploader @@ -27,7 +28,9 @@ def setup_unittest_data(tmpdir: py.path.local): def test_upload_images(setup_unittest_data: py.path.local, setup_upload: py.path.local): mly_uploader = uploader.Uploader( uploader.UploadOptions( - {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, dry_run=True + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + dry_run=True, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), ) ) test_exif = setup_unittest_data.join("test_exif.jpg") @@ -106,6 +109,7 @@ def test_upload_images_multiple_sequences( # will call the API for real # "MAPOrganizationKey": "3011753992432185", }, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), dry_run=True, ), ) @@ -177,6 +181,7 @@ def test_upload_zip( # will call the API for real # "MAPOrganizationKey": 3011753992432185, }, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), dry_run=True, ), emitter=emitter, @@ -252,3 +257,498 @@ def _upload_end(payload): test_upload_zip(setup_unittest_data, setup_upload, emitter=emitter) assert len(stats) == 2, stats + + +class TestImageSequenceUploader: + """Test suite for ImageSequenceUploader with focus on multithreading scenarios and caching.""" + + def test_image_sequence_uploader_basic(self, setup_unittest_data: py.path.local): + """Test basic functionality of ImageSequenceUploader.""" + upload_options = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + dry_run=True, + ) + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader(upload_options, emitter) + + # Create mock image metadata for a single sequence + test_exif = setup_unittest_data.join("test_exif.jpg") + image_metadatas = [ + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif), + "filetype": "image", + "MAPSequenceUUID": "sequence_1", + } + ), + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927695, + "MAPLongitude": 16.1840945, + "MAPCaptureTime": "2021_02_13_13_24_42_140", + "filename": str(test_exif), + "filetype": "image", + "MAPSequenceUUID": "sequence_1", + } + ), + ] + + # Test upload + results = list(sequence_uploader.upload_images(image_metadatas)) + + assert len(results) == 1 + sequence_uuid, upload_result = results[0] + assert sequence_uuid == "sequence_1" + assert upload_result.error is None + assert upload_result.result is not None + + def test_image_sequence_uploader_multithreading_with_cache_enabled( + self, setup_unittest_data: py.path.local + ): + """Test that ImageSequenceUploader's internal multithreading works correctly when cache is enabled.""" + # Create upload options that enable cache + upload_options_with_cache = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + num_upload_workers=4, # This will be used internally for parallel image uploads + dry_run=False, # Cache requires dry_run=False initially + ) + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader( + upload_options_with_cache, emitter + ) + + # Override to dry_run=True for actual testing + sequence_uploader.upload_options = dataclasses.replace( + upload_options_with_cache, dry_run=True + ) + sequence_uploader.cached_image_uploader.upload_options = dataclasses.replace( + upload_options_with_cache, dry_run=True + ) + + # Verify cache is available and shared + assert sequence_uploader.cached_image_uploader.cache is not None, ( + "SingleImageUploader should share the same cache instance" + ) + + test_exif = setup_unittest_data.join("test_exif.jpg") + + num_images = 100 # Reasonable number for testing with direct cache verification + image_metadatas = [] + + for i in range(num_images): + image_metadatas.append( + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927694 + i * 0.0001, + "MAPLongitude": 16.1840944 + i * 0.0001, + "MAPCaptureTime": f"2021_02_13_13_{(24 + i) % 60:02d}_{(41 + i) % 60:02d}_140", + "filename": str(test_exif), + "filetype": "image", + "MAPSequenceUUID": "multi_thread_sequence", + } + ) + ) + + # Test upload - this will internally use multithreading via _upload_images_parallel + results = list(sequence_uploader.upload_images(image_metadatas)) + + assert len(results) == 1, f"Expected 1 sequence result, got {len(results)}" + sequence_uuid, upload_result = results[0] + assert sequence_uuid == "multi_thread_sequence", ( + f"Got wrong sequence UUID: {sequence_uuid}" + ) + assert upload_result.error is None, ( + f"Upload failed with error: {upload_result.error}" + ) + assert upload_result.result is not None, "Upload should return a cluster ID" + + def test_image_sequence_uploader_multithreading_with_cache_disabled( + self, setup_unittest_data: py.path.local + ): + """Test that ImageSequenceUploader's internal multithreading works correctly when cache is disabled.""" + # Test with cache disabled via constants patch + with patch("mapillary_tools.constants.UPLOAD_CACHE_DIR", None): + upload_options = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + num_upload_workers=4, # This will be used internally for parallel image uploads + dry_run=True, + ) + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader(upload_options, emitter) + + # Verify cache is disabled for both instances + assert sequence_uploader.cached_image_uploader.cache is None, ( + "Should have cache disabled" + ) + + test_exif = setup_unittest_data.join("test_exif.jpg") + + num_images = 100 + image_metadatas = [] + + for i in range(num_images): + image_metadatas.append( + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 59.5927694 + i * 0.0001, + "MAPLongitude": 17.1840944 + i * 0.0001, + "MAPCaptureTime": f"2021_02_13_14_{(24 + i) % 60:02d}_{(41 + i) % 60:02d}_140", + "filename": str(test_exif), + "filetype": "image", + "MAPSequenceUUID": "no_cache_multi_thread_sequence", + } + ) + ) + + # Test upload - this will internally use multithreading via _upload_images_parallel + results = list(sequence_uploader.upload_images(image_metadatas)) + + assert len(results) == 1, f"Expected 1 sequence result, got {len(results)}" + sequence_uuid, upload_result = results[0] + assert sequence_uuid == "no_cache_multi_thread_sequence", ( + f"Got wrong sequence UUID: {sequence_uuid}" + ) + assert upload_result.error is None, ( + f"Upload failed with error: {upload_result.error}" + ) + assert upload_result.result is not None, "Upload should return a cluster ID" + + # Test that cache operations are safely ignored when cache is disabled + # These operations should not throw errors even when cache is None + test_key = "test_no_cache_key" + test_value = "test_no_cache_value" + + # Should safely return None without error + retrieved_value = ( + sequence_uploader.cached_image_uploader._get_cached_file_handle( + test_key + ) + ) + assert retrieved_value is None, ( + "Cache get should return None when cache is disabled" + ) + + # Should safely do nothing without error + sequence_uploader.cached_image_uploader._set_file_handle_cache( + test_key, test_value + ) + + # Verify the value is still None after attempted set + retrieved_value_after_set = ( + sequence_uploader.cached_image_uploader._get_cached_file_handle( + test_key + ) + ) + assert retrieved_value_after_set is None, ( + "Cache should remain disabled after set attempt" + ) + + def test_image_sequence_uploader_cache_hits_second_run( + self, setup_unittest_data: py.path.local + ): + """Test that cache hits work correctly for ImageSequenceUploader with overlapping uploads.""" + # Create upload options that enable cache but use dry_run for testing + # We need to create the cache instance separately to avoid the dry_run check + + # Create cache-enabled options to initialize the cache + cache_enabled_options = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + dry_run=False, # Cache requires dry_run=False initially + ) + + # Create the sequence uploader - your changes now automatically expose cache + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader( + cache_enabled_options, emitter + ) + + # Override to dry_run=True for actual testing (cache remains intact) + sequence_uploader.upload_options = dataclasses.replace( + cache_enabled_options, dry_run=True + ) + sequence_uploader.cached_image_uploader.upload_options = dataclasses.replace( + cache_enabled_options, dry_run=True + ) + + # 1. Make sure cache is enabled + assert sequence_uploader.cached_image_uploader.cache is not None, ( + "Cache should be enabled" + ) + + test_exif = setup_unittest_data.join("test_exif.jpg") + test_exif1 = setup_unittest_data.join("test_exif_1.jpg") + test_exif.copy(test_exif1) + test_exif2 = setup_unittest_data.join("test_exif_2.jpg") + test_exif.copy(test_exif2) + test_exif3 = setup_unittest_data.join("test_exif_3.jpg") + test_exif.copy(test_exif3) + test_exif4 = setup_unittest_data.join("test_exif_4.jpg") + test_exif.copy(test_exif4) + test_exif5 = setup_unittest_data.join("test_exif_5.jpg") + test_exif.copy(test_exif5) + + # Create simpler test data to focus on cache behavior + # Create image metadata for images a, b, c, d, e + images = { + "a": description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif1), + "filetype": "image", + "MAPSequenceUUID": "cache_test_sequence_1", + } + ), + "b": description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927695, + "MAPLongitude": 16.1840945, + "MAPCaptureTime": "2021_02_13_13_24_42_141", + "filename": str(test_exif2), + "filetype": "image", + "MAPSequenceUUID": "cache_test_sequence_1", + } + ), + "c": description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927696, + "MAPLongitude": 16.1840946, + "MAPCaptureTime": "2021_02_13_13_24_43_142", + "filename": str(test_exif3), + "filetype": "image", + "MAPSequenceUUID": "cache_test_sequence_1", + } + ), + "d": description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927697, + "MAPLongitude": 16.1840947, + "MAPCaptureTime": "2021_02_13_13_24_44_143", + "filename": str(test_exif4), + "filetype": "image", + "MAPSequenceUUID": "cache_test_sequence_2", + } + ), + "e": description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927698, + "MAPLongitude": 16.1840948, + "MAPCaptureTime": "2021_02_13_13_24_45_144", + "filename": str(test_exif5), + "filetype": "image", + "MAPSequenceUUID": "cache_test_sequence_2", + } + ), + } + + assert list(sequence_uploader.cached_image_uploader.cache.keys()) == [] + results_1 = list( + sequence_uploader.upload_images([images["a"], images["b"], images["c"]]) + ) + + # Assert that first upload has no errors + assert len(results_1) == 1 + sequence_uuid_1, upload_result_1 = results_1[0] + assert upload_result_1.error is None, ( + f"First upload failed with error: {upload_result_1.error}" + ) + assert upload_result_1.result is not None + + # Capture cache keys after first upload + first_upload_cache_keys = set( + sequence_uploader.cached_image_uploader.cache.keys() + ) + assert len(first_upload_cache_keys) == 3 + + results_2 = list( + sequence_uploader.upload_images( + [ + images["c"], # Should hit cache + images["d"], # New image, should upload + images["e"], # New image, should upload + ] + ) + ) + + # Assert that second upload has no errors + assert ( + len(results_2) == 2 + ) # Two sequences: cache_test_sequence_1 and cache_test_sequence_2 + for sequence_uuid, upload_result in results_2: + assert upload_result.error is None, ( + f"Second upload failed with error: {upload_result.error}" + ) + assert upload_result.result is not None + + # Capture cache keys after second upload + second_upload_cache_keys = set( + sequence_uploader.cached_image_uploader.cache.keys() + ) + assert len(second_upload_cache_keys) == 5 + + # Assert that all keys from first upload are still present in second upload + assert first_upload_cache_keys.issubset(second_upload_cache_keys), ( + f"Cache keys from first upload {first_upload_cache_keys} should be " + f"contained in second upload cache keys {second_upload_cache_keys}" + ) + + def test_image_sequence_uploader_multiple_sequences( + self, setup_unittest_data: py.path.local + ): + """Test ImageSequenceUploader with multiple sequences.""" + upload_options = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + dry_run=True, + ) + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader(upload_options, emitter) + + test_exif = setup_unittest_data.join("test_exif.jpg") + test_exif1 = setup_unittest_data.join("test_exif_1.jpg") + test_exif.copy(test_exif1) + test_exif2 = setup_unittest_data.join("test_exif_2.jpg") + test_exif.copy(test_exif2) + test_exif3 = setup_unittest_data.join("test_exif_3.jpg") + test_exif.copy(test_exif3) + fixed_exif = setup_unittest_data.join("fixed_exif.jpg") + + # Create metadata for multiple sequences + image_metadatas = [ + # Sequence 1 - 2 images + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif1), + "filetype": "image", + "MAPSequenceUUID": "multi_sequence_1", + } + ), + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927695, + "MAPLongitude": 16.1840945, + "MAPCaptureTime": "2021_02_13_13_24_42_140", + "filename": str(test_exif2), + "filetype": "image", + "MAPSequenceUUID": "multi_sequence_1", + } + ), + # Sequence 2 - 1 image + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 59.5927694, + "MAPLongitude": 17.1840944, + "MAPCaptureTime": "2021_02_13_13_25_41_140", + "filename": str(test_exif3), + "filetype": "image", + "MAPSequenceUUID": "multi_sequence_2", + } + ), + ] + + # Test upload + results = list(sequence_uploader.upload_images(image_metadatas)) + + assert len(results) == 2, f"Expected 2 sequences, got {len(results)}" + + # Verify both sequences uploaded successfully + sequence_results = {seq_uuid: result for seq_uuid, result in results} + + assert "multi_sequence_1" in sequence_results, "Sequence 1 should be present" + assert "multi_sequence_2" in sequence_results, "Sequence 2 should be present" + + result_1 = sequence_results["multi_sequence_1"] + result_2 = sequence_results["multi_sequence_2"] + + assert result_1.error is None, ( + f"Sequence 1 should not have error: {result_1.error}" + ) + assert result_1.result is not None, "Sequence 1 should have result" + + assert result_2.error is None, ( + f"Sequence 2 should not have error: {result_2.error}" + ) + assert result_2.result is not None, "Sequence 2 should have result" + + def test_image_sequence_uploader_event_emission( + self, setup_unittest_data: py.path.local + ): + """Test that ImageSequenceUploader properly emits events during upload.""" + upload_options = uploader.UploadOptions( + {"user_upload_token": "YOUR_USER_ACCESS_TOKEN"}, + upload_cache_path=Path(setup_unittest_data.join("upload_cache")), + dry_run=True, + ) + emitter = uploader.EventEmitter() + sequence_uploader = uploader.ImageSequenceUploader(upload_options, emitter) + + # Track emitted events + emitted_events = [] + + @emitter.on("upload_start") + def on_upload_start(payload): + emitted_events.append(("upload_start", payload.copy())) + + @emitter.on("upload_end") + def on_upload_end(payload): + emitted_events.append(("upload_end", payload.copy())) + + @emitter.on("upload_finished") + def on_upload_finished(payload): + emitted_events.append(("upload_finished", payload.copy())) + + test_exif = setup_unittest_data.join("test_exif.jpg") + image_metadatas = [ + description.DescriptionJSONSerializer.from_desc( + { + "MAPLatitude": 58.5927694, + "MAPLongitude": 16.1840944, + "MAPCaptureTime": "2021_02_13_13_24_41_140", + "filename": str(test_exif), + "filetype": "image", + "MAPSequenceUUID": "event_test_sequence", + } + ), + ] + + # Test upload + results = list(sequence_uploader.upload_images(image_metadatas)) + + assert len(results) == 1 + sequence_uuid, upload_result = results[0] + assert upload_result.error is None + + # Verify events were emitted + assert len(emitted_events) >= 3, ( + f"Expected at least 3 events, got {len(emitted_events)}" + ) + + event_types = [event[0] for event in emitted_events] + assert "upload_start" in event_types, "upload_start event should be emitted" + assert "upload_end" in event_types, "upload_end event should be emitted" + assert "upload_finished" in event_types, ( + "upload_finished event should be emitted" + ) + + # Verify event payload structure + start_event = next( + event for event in emitted_events if event[0] == "upload_start" + ) + start_payload = start_event[1] + + assert "sequence_uuid" in start_payload, ( + "upload_start should contain sequence_uuid" + ) + assert "entity_size" in start_payload, "upload_start should contain entity_size" + assert start_payload["sequence_uuid"] == "event_test_sequence"