diff --git a/faststack/app.py b/faststack/app.py index fd6ad4f..0031139 100644 --- a/faststack/app.py +++ b/faststack/app.py @@ -48,7 +48,7 @@ # ⬇️ these are the ones that went missing from faststack.config import config from faststack.logging_setup import setup_logging -from faststack.models import ImageFile, DecodedImage +from faststack.models import ImageFile, DecodedImage, EntryMetadata from faststack.io.indexer import ( find_images, find_images_with_variants, @@ -101,8 +101,8 @@ UIStateRestoration, ) -# AWB thresholds on the -1..+1 normalised slider range. -# NOOP: skip applying correction entirely (≈ 0.64 Lab units — below perceptible). +# AWB thresholds on the -1..+1 normalized slider range. +# NOOP: skip corrections that are effectively imperceptible in the current gain model. # LABEL: below this the direction word becomes "neutral" in the status message. _AWB_NOOP_EPS = 0.005 _AWB_LABEL_EPS = 0.002 @@ -1841,6 +1841,11 @@ def save_edited_image(self): # if the user continued editing after save was submitted. getattr(self.image_editor, "_edits_rev", None), ) + save_metadata_path = ( + self.image_files[self.current_index].path + if 0 <= self.current_index < len(self.image_files) + else save_target_path or self.image_editor.current_filepath + ) if save_image_key and save_image_key in self._saving_keys: self.update_status_message( @@ -1871,7 +1876,15 @@ def save_edited_image(self): "editor_was_open": editor_was_open, "save_image_key": save_image_key, "session_token": session_token, + "save_directory_key": self._key(self.image_dir), + "save_metadata_path": ( + str(save_metadata_path) if save_metadata_path else None + ), "started_from_restore_override": started_from_restore_override, + # Keep metadata writes bound to the sidecar that owned the save + # request. The user can navigate to another folder before the + # background save completes, and self.sidecar may change. + "save_sidecar": self.sidecar, } # Submit save work to background thread — operates only on the snapshot @@ -1967,8 +1980,8 @@ def _on_save_finished(self, save_result: dict): return result = save_result.get("result") - if isinstance(result, tuple) and len(result) >= 2: - saved_path, _ = result # backup_path unused + if isinstance(result, tuple) and len(result) == 2: + saved_path, backup_path = result # --- Post-Save Cleanup --- @@ -2028,7 +2041,42 @@ def _on_save_finished(self, save_result: dict): # 1. Update sidecar metadata FIRST so all following refreshes see it if saved_path: - self.sidecar.update_metadata(saved_path, {"edited": True}) + save_sidecar = save_result.get("save_sidecar") or self.sidecar + metadata_path = ( + Path(save_result["save_metadata_path"]) + if save_result.get("save_metadata_path") + else saved_path + ) + metadata_before = self._mark_image_edited_in_sidecar( + save_sidecar, metadata_path + ) + save_directory_key = save_result.get("save_directory_key") + current_directory_key = self._key(self.image_dir) + should_record_undo = backup_path and ( + save_directory_key is None + or save_directory_key == current_directory_key + ) + if should_record_undo: + self.undo_history.append( + ( + "save_edit", + self._build_edit_undo_data( + saved_path, + backup_path, + metadata_path=metadata_path, + metadata_before=metadata_before, + sidecar=save_sidecar, + ), + time.time(), + ) + ) + elif backup_path: + log.info( + "Skipping save_edit undo for %s after directory change: %s -> %s", + saved_path, + save_directory_key, + current_directory_key, + ) # 2. Update variants and re-select index self.refresh_image_list() @@ -2054,6 +2102,9 @@ def _on_save_finished(self, save_result: dict): else: # Success reported but result shape unexpected log.warning("Save finished with unexpected result shape: %r", result) + self.update_status_message( + "Save finished, but the result payload was malformed.", timeout=5000 + ) # --- Actions --- @@ -5427,46 +5478,49 @@ def undo_delete(self): if self._thumbnail_model and self._is_grid_view_active: self._thumbnail_model.refresh() - elif action_type == "auto_white_balance": - saved_path, backup_path = action_data + elif action_type in { + "save_edit", + "auto_white_balance", + "auto_levels", + "crop", + }: try: - if self._restore_backup_safe(saved_path, backup_path): - self._post_undo_refresh_and_select( - Path(saved_path), update_hist=True - ) - self.update_status_message("Undid auto white balance") - except Exception as e: + ( + saved_path, + backup_path, + metadata_path, + metadata_before, + metadata_sidecar, + ) = self._parse_edit_undo_data(action_data) + except ValueError as e: self.update_status_message(f"Undo failed: {e}") - if Path(backup_path).exists(): - self.undo_history.append( - ("auto_white_balance", action_data, timestamp) - ) + return - elif action_type == "auto_levels": - saved_path, backup_path = action_data try: if self._restore_backup_safe(saved_path, backup_path): - self._post_undo_refresh_and_select( - Path(saved_path), update_hist=True + restore_sidecar = metadata_sidecar or self.sidecar + restore_metadata_path = ( + Path(metadata_path) if metadata_path else Path(saved_path) + ) + self._restore_metadata_snapshot( + restore_sidecar, restore_metadata_path, metadata_before ) - self.update_status_message("Undid auto levels") - except Exception as e: - self.update_status_message(f"Undo failed: {e}") - if Path(backup_path).exists(): - self.undo_history.append(("auto_levels", action_data, timestamp)) - - elif action_type == "crop": - saved_path, backup_path = action_data - try: - if self._restore_backup_safe(saved_path, backup_path): self._post_undo_refresh_and_select( - Path(saved_path), update_hist=False + Path(saved_path), + update_hist=action_type != "crop", ) - self.update_status_message("Undid crop") - except Exception as e: + if action_type == "save_edit": + self.update_status_message("Undid saved edit") + elif action_type == "auto_white_balance": + self.update_status_message("Undid auto white balance") + elif action_type == "auto_levels": + self.update_status_message("Undid auto levels") + else: + self.update_status_message("Undid crop") + except (FileNotFoundError, OSError, shutil.Error) as e: self.update_status_message(f"Undo failed: {e}") if Path(backup_path).exists(): - self.undo_history.append(("crop", action_data, timestamp)) + self.undo_history.append((action_type, action_data, timestamp)) def shutdown_qt(self): """Shutdown Qt objects only - MUST run on main/Qt thread.""" @@ -5861,6 +5915,100 @@ def clear_message(): self.ui_state.statusMessage = message QTimer.singleShot(timeout, clear_message) + def _capture_metadata_snapshot( + self, sidecar: SidecarManager, image_path: Path + ) -> Optional[dict]: + """Capture the current sidecar metadata for undo/restore.""" + meta = sidecar.get_metadata(image_path, create=False) + if meta is None: + return None + return { + field_name: getattr(meta, field_name) + for field_name in EntryMetadata.__dataclass_fields__ + } + + def _mark_image_edited_in_sidecar( + self, sidecar: SidecarManager, image_path: Path + ) -> Optional[dict]: + """Mark an image as edited and return the pre-save metadata snapshot.""" + old_meta = self._capture_metadata_snapshot(sidecar, image_path) + new_meta = dict(old_meta or {}) + new_meta["edited"] = True + new_meta["edited_date"] = datetime.now().strftime("%Y-%m-%d") + sidecar.update_metadata(image_path, new_meta) + return old_meta + + @staticmethod + def _build_edit_undo_data( + saved_path: Path, + backup_path: Path, + *, + metadata_path: Optional[Path] = None, + metadata_before: Optional[dict] = None, + sidecar: Optional[SidecarManager] = None, + ) -> dict: + """Build a backward-compatible undo payload for saved edit operations.""" + return { + "saved_path": str(saved_path), + "backup_path": str(backup_path), + "metadata_path": str(metadata_path) if metadata_path else None, + "metadata_before": metadata_before, + "sidecar": sidecar, + } + + @staticmethod + def _parse_edit_undo_data( + action_data: Any, + ) -> tuple[str, str, Optional[str], Optional[dict], Any]: + """Read both legacy tuple undo payloads and new dict payloads.""" + if isinstance(action_data, dict): + saved_path = action_data.get("saved_path") + backup_path = action_data.get("backup_path") + if saved_path and backup_path: + return ( + str(saved_path), + str(backup_path), + action_data.get("metadata_path"), + action_data.get("metadata_before"), + action_data.get("sidecar"), + ) + elif isinstance(action_data, (tuple, list)) and len(action_data) >= 2: + return str(action_data[0]), str(action_data[1]), None, None, None + + raise ValueError(f"Unexpected edit undo payload: {action_data!r}") + + def _restore_metadata_snapshot( + self, sidecar: SidecarManager, image_path: Path, snapshot: Optional[dict] + ) -> None: + """Restore only the metadata fields owned by edit-save actions.""" + stable_key = sidecar.metadata_key_for_path(image_path) + current_meta = sidecar.data.entries.get(stable_key) + restored_edited = bool(snapshot.get("edited", False)) if snapshot else False + restored_edited_date = snapshot.get("edited_date") if snapshot else None + changed = False + + if current_meta is None: + if not restored_edited and restored_edited_date is None: + return + current_meta = sidecar.get_metadata(image_path, create=True) + changed = True + + if current_meta.edited != restored_edited: + current_meta.edited = restored_edited + changed = True + if current_meta.edited_date != restored_edited_date: + current_meta.edited_date = restored_edited_date + changed = True + + if snapshot is None: + default_meta = EntryMetadata() + if current_meta.__dict__ == default_meta.__dict__: + del sidecar.data.entries[stable_key] + changed = True + + if changed: + sidecar.save() + def _is_image_saving(self, file_path_str: str) -> bool: if not file_path_str or not hasattr(self, "_saving_keys"): return False @@ -7363,6 +7511,10 @@ def execute_crop(self): if save_result: saved_path, backup_path = save_result + metadata_path = self.image_files[self.current_index].path + metadata_before = self._mark_image_edited_in_sidecar( + self.sidecar, metadata_path + ) # IF we were restoring from a variant, clear the override now that it's "the truth" if is_restoring: @@ -7370,12 +7522,19 @@ def execute_crop(self): timestamp = time.time() self.undo_history.append( - ("crop", (str(saved_path), str(backup_path)), timestamp) + ( + "crop", + self._build_edit_undo_data( + saved_path, + backup_path, + metadata_path=metadata_path, + metadata_before=metadata_before, + sidecar=self.sidecar, + ), + timestamp, + ) ) - # Mark as edited in sidecar - self.sidecar.update_metadata(saved_path, {"edited": True}) - # Exit crop mode self.ui_state.isCropping = False self.ui_state.currentCropBox = (0, 0, 1000, 1000) @@ -7613,14 +7772,25 @@ def quick_auto_levels(self): if save_result: saved_path, backup_path = save_result + metadata_path = self.image_files[self.current_index].path + metadata_before = self._mark_image_edited_in_sidecar( + self.sidecar, metadata_path + ) timestamp = time.time() self.undo_history.append( - ("auto_levels", (saved_path, backup_path), timestamp) + ( + "auto_levels", + self._build_edit_undo_data( + saved_path, + backup_path, + metadata_path=metadata_path, + metadata_before=metadata_before, + sidecar=self.sidecar, + ), + timestamp, + ) ) - # 1. Update sidecar metadata FIRST so all following refreshes see it - self.sidecar.update_metadata(saved_path, {"edited": True}) - # 2. Update list and model to pick up changes self.refresh_image_list() @@ -7713,12 +7883,23 @@ def _apply_auto_levels_at_index(self, index: int) -> bool: if save_result: saved_path, backup_path = save_result timestamp = time.time() - - # Mark as edited in sidecar - self.sidecar.update_metadata(saved_path, {"edited": True}) + metadata_path = image_file.path + metadata_before = self._mark_image_edited_in_sidecar( + self.sidecar, metadata_path + ) self.undo_history.append( - ("auto_levels", (saved_path, backup_path), timestamp) + ( + "auto_levels", + self._build_edit_undo_data( + saved_path, + backup_path, + metadata_path=metadata_path, + metadata_before=metadata_before, + sidecar=self.sidecar, + ), + timestamp, + ) ) self.image_editor.clear() self.image_cache.pop_path(saved_path) @@ -7820,16 +8001,25 @@ def quick_auto_white_balance(self): t_start = time.perf_counter() image_file = self.image_files[self.current_index] - filepath = str(image_file.path) + if self.view_override_path: + active_path = Path(self.view_override_path) + else: + active_path = self.get_active_edit_path(self.current_index) + filepath = str(active_path) # Ensure image is loaded in editor (skip if already loaded) - if ( - not self.image_editor.current_filepath - or str(self.image_editor.current_filepath) != filepath - ): + editor_path = self.image_editor.current_filepath + paths_match = False + if editor_path: + try: + paths_match = Path(editor_path).resolve() == active_path.resolve() + except (OSError, ValueError): + paths_match = str(editor_path) == filepath + + if not paths_match: cached_preview = self.get_decoded_image(self.current_index) if not self.image_editor.load_image( - filepath, cached_preview=cached_preview + filepath, cached_preview=cached_preview, preview_only=True ): self.update_status_message("Failed to load image") return @@ -7847,7 +8037,14 @@ def quick_auto_white_balance(self): # Save the edited image (this creates a backup automatically) try: - save_result = self.image_editor.save_image() + save_target_path = self._get_save_target_path_for_current_view() + save_result = self.image_editor.save_image_uint8_white_balance( + save_target_path=save_target_path + ) + if save_result is None: + save_result = self.image_editor.save_image( + save_target_path=save_target_path + ) except RuntimeError as e: log.warning("quick_auto_white_balance: Save failed: %s", e) self.update_status_message(f"Failed to save image: {e}") @@ -7863,8 +8060,10 @@ def quick_auto_white_balance(self): if save_result: saved_path, backup_path = save_result timestamp = time.time() - # 1. Update sidecar metadata FIRST so all following refreshes see it - self.sidecar.update_metadata(saved_path, {"edited": True}) + metadata_path = self.image_files[self.current_index].path + metadata_before = self._mark_image_edited_in_sidecar( + self.sidecar, metadata_path + ) # 2. Update list and model to pick up changes self.refresh_image_list() @@ -7873,7 +8072,17 @@ def quick_auto_white_balance(self): self._reindex_after_save(saved_path) self.undo_history.append( - ("auto_white_balance", (saved_path, backup_path), timestamp) + ( + "auto_white_balance", + self._build_edit_undo_data( + saved_path, + backup_path, + metadata_path=metadata_path, + metadata_before=metadata_before, + sidecar=self.sidecar, + ), + timestamp, + ) ) # Force the image editor to clear its current state so it reloads fresh @@ -7972,8 +8181,7 @@ def auto_white_balance_legacy(self) -> Optional[str]: self.ui_state.white_balance_by = by_value self.ui_state.white_balance_mg = mg_value - self.ui_refresh_generation += 1 - self.ui_state.currentImageSourceChanged.emit() + self._kick_preview_worker() by_dir = _awb_direction(by_value, "warming", "cooling") mg_dir = _awb_direction(mg_value, "magenta", "greener") @@ -7988,8 +8196,8 @@ def auto_white_balance_legacy(self) -> Optional[str]: def auto_white_balance_lab(self) -> Optional[str]: """ - Calculates and applies auto white balance using the Lab color space, - filtering out clipped and saturated pixels for a more robust result. + Calculates and applies auto white balance using a robust preview-sized + neutral-pixel estimate aligned to the editor's slider model. Returns the detail message string if a correction was applied, or None. """ @@ -7997,114 +8205,30 @@ def auto_white_balance_lab(self) -> Optional[str]: log.warning("No image loaded in editor for auto white balance") return None - try: - import cv2 # numpy is already imported at module level (line 79) - except ImportError: - log.error( - "OpenCV not found. Please install with: pip install opencv-python" - ) - self.update_status_message("Error: OpenCV not installed") - return None - t_awb_start = time.perf_counter() - - # Subsample from float_image for speed. float_image is the authoritative - # display-referred sRGB float32 buffer (editor.py:504-505 does - # np.array(rgb) / 255.0 from Pillow sRGB), same colour space as the - # old PIL-based path, so the AWB result is identical (within subsampling noise). - img_arr = self.image_editor.float_image - if img_arr is not None: - h, w = img_arr.shape[:2] - TARGET_PIXELS = 2_000_000 - stride = max(1, int(np.sqrt(h * w / TARGET_PIXELS))) - sub = np.ascontiguousarray( - img_arr[::stride, ::stride] - ) # contiguous for cv2 - arr = (np.clip(sub, 0.0, 1.0) * 255).astype(np.uint8) - log.debug( - "AWB: subsampled %dx%d -> %dx%d (stride %d)", - w, - h, - arr.shape[1], - arr.shape[0], - stride, - ) - else: - # Fallback: use original_image (full PIL Image) - img = self.image_editor.original_image - if img.mode != "RGB": - img = img.convert("RGB") - arr = np.array(img, dtype=np.uint8) - - t_awb_subsample = time.perf_counter() - - # --- Tunable Constants for Auto White Balance (from config) --- - _LOWER_BOUND_RGB = config.getint("awb", "rgb_lower_bound", 5) - _UPPER_BOUND_RGB = config.getint("awb", "rgb_upper_bound", 250) - _LUMA_LOWER_BOUND = config.getint("awb", "luma_lower_bound", 30) - _LUMA_UPPER_BOUND = config.getint("awb", "luma_upper_bound", 220) + strength = config.getfloat("awb", "strength", 0.7) warm_bias = config.getint("awb", "warm_bias", 6) tint_bias = config.getint("awb", "tint_bias", 0) - _TARGET_A_LAB = 128.0 + tint_bias - _TARGET_B_LAB = 128.0 + warm_bias - _SCALING_FACTOR_LAB_TO_SLIDER = 128.0 - _CORRECTION_STRENGTH = config.getfloat("awb", "strength", 0.7) - - # --- 1. Reject clipped channels and use a luma midtone mask --- - mask = ( - (arr[:, :, 0] > _LOWER_BOUND_RGB) - & (arr[:, :, 0] < _UPPER_BOUND_RGB) - & (arr[:, :, 1] > _LOWER_BOUND_RGB) - & (arr[:, :, 1] < _UPPER_BOUND_RGB) - & (arr[:, :, 2] > _LOWER_BOUND_RGB) - & (arr[:, :, 2] < _UPPER_BOUND_RGB) + rgb_lower_bound = config.getint("awb", "rgb_lower_bound", 5) + rgb_upper_bound = config.getint("awb", "rgb_upper_bound", 250) + luma_lower_bound = config.getint("awb", "luma_lower_bound", 30) + luma_upper_bound = config.getint("awb", "luma_upper_bound", 220) + + estimate = self.image_editor.estimate_auto_white_balance( + strength=strength, + warm_bias=warm_bias, + tint_bias=tint_bias, + rgb_lower_bound=rgb_lower_bound, + rgb_upper_bound=rgb_upper_bound, + luma_lower_bound=luma_lower_bound, + luma_upper_bound=luma_upper_bound, ) - - luma = 0.2126 * arr[:, :, 0] + 0.7152 * arr[:, :, 1] + 0.0722 * arr[:, :, 2] - mask &= (luma > _LUMA_LOWER_BOUND) & (luma < _LUMA_UPPER_BOUND) - - if not np.any(mask): - log.warning( - "Auto white balance: No pixels found after clipping and luma filter. Aborting." - ) - self.update_status_message("AWB failed: no valid pixels found") + if not estimate: + self.update_status_message("AWB failed: no valid neutral pixels found") return None - t_awb_mask = time.perf_counter() - - # --- 2. Work in Lab color space --- - lab_image = cv2.cvtColor(arr, cv2.COLOR_RGB2LAB) - - a_channel = lab_image[:, :, 1] - b_channel = lab_image[:, :, 2] - - masked_a = a_channel[mask] - masked_b = b_channel[mask] - - a_mean = masked_a.mean() - b_mean = masked_b.mean() - - a_shift = _TARGET_A_LAB - a_mean - b_shift = _TARGET_B_LAB - b_mean - - log.info( - "Auto WB (Lab) - means: a*=%.1f, b*=%.1f; targets: a*=%.1f, b*=%.1f; shifts: a*=%.1f, b*=%.1f", - a_mean, - b_mean, - _TARGET_A_LAB, - _TARGET_B_LAB, - a_shift, - b_shift, - ) - - # --- 3. Convert Lab shift to our slider values with strength factor --- - by_value = (b_shift / _SCALING_FACTOR_LAB_TO_SLIDER) * _CORRECTION_STRENGTH - mg_value = (a_shift / _SCALING_FACTOR_LAB_TO_SLIDER) * _CORRECTION_STRENGTH - - by_value = float(np.clip(by_value, -1.0, 1.0)) - mg_value = float(np.clip(mg_value, -1.0, 1.0)) - - log.info("Auto white balance values: B/Y=%.3f, M/G=%.3f", by_value, mg_value) + by_value = estimate["by_value"] + mg_value = estimate["mg_value"] # No-change detection — see _AWB_NOOP_EPS definition for rationale if abs(by_value) < _AWB_NOOP_EPS and abs(mg_value) < _AWB_NOOP_EPS: @@ -8117,25 +8241,25 @@ def auto_white_balance_lab(self) -> Optional[str]: self.ui_state.white_balance_by = by_value self.ui_state.white_balance_mg = mg_value - self.ui_refresh_generation += 1 - self.ui_state.currentImageSourceChanged.emit() + self._kick_preview_worker() by_dir = _awb_direction(by_value, "warming", "cooling") mg_dir = _awb_direction(mg_value, "magenta", "greener") msg = ( f"AWB: B/Y {by_value:+.2f} ({by_dir}), M/G {mg_value:+.2f} ({mg_dir})" - f" \u2014 a*={a_mean:.0f}\u2192{_TARGET_A_LAB:.0f}," - f" b*={b_mean:.0f}\u2192{_TARGET_B_LAB:.0f}" + f" \u2014 neutral RGB {estimate['r_mean']:.3f}/" + f"{estimate['g_mean']:.3f}/{estimate['b_mean']:.3f}" ) t_awb_end = time.perf_counter() + selected_pixels = int(estimate.get("selected_pixels", 0)) + stride = int(estimate.get("stride", 0)) + neutrality_limit = float(estimate.get("neutrality_limit", 0.0)) log.debug( - "[AUTO_COLOR] subsample=%dms mask=%dms lab+calc=%dms total=%dms (%dx%d)", - int((t_awb_subsample - t_awb_start) * 1000), - int((t_awb_mask - t_awb_subsample) * 1000), - int((t_awb_end - t_awb_mask) * 1000), + "[AUTO_COLOR] total=%dms (selected=%d stride=%d neutral<=%.3f)", int((t_awb_end - t_awb_start) * 1000), - arr.shape[1], - arr.shape[0], + selected_pixels, + stride, + neutrality_limit, ) self.update_status_message(msg) return msg diff --git a/faststack/imaging/editor.py b/faststack/imaging/editor.py index 2d94d0f..2092eb4 100644 --- a/faststack/imaging/editor.py +++ b/faststack/imaging/editor.py @@ -362,6 +362,7 @@ def __init__(self): # Cached 768-entry LUT list for save_image_uint8_levels (R+G+B tables), # keyed on (round(blacks, 3), round(whites, 3)). self._cached_u8_lut: Optional[Tuple[Tuple[float, float], List[int]]] = None + self._cached_u8_wb_lut: Optional[Tuple[Tuple[float, float], List[int]]] = None # Mask subsystem — generic mask assets keyed by tool id self._mask_assets: Dict[str, MaskData] = {} @@ -384,6 +385,7 @@ def clear(self): self._cached_highlight_analysis = None self._cached_detail_bands = None self._cached_u8_lut = None + self._cached_u8_wb_lut = None self._mask_assets.clear() self._mask_raster_cache.clear() # Optionally also reset edits if that matches your mental model: @@ -1438,24 +1440,12 @@ def auto_levels( for c in range(3): chan = rgb[:, :, c] + hist = np.bincount(chan.reshape(-1), minlength=256) # Treat near-white/near-black as clipped (JPEG artifacts often land on 254/1) - clipped_low_pct.append( - 100.0 * float(np.count_nonzero(chan <= 1)) / float(total) - ) - clipped_high_pct.append( - 100.0 * float(np.count_nonzero(chan >= 254)) / float(total) - ) - - # Use discrete selection methods to avoid interpolation surprises on uint8. - # Fallback for older numpy (<1.22) that doesn't support method=. - try: - p_lows.append(float(np.percentile(chan, low_p, method="lower"))) - p_highs.append(float(np.percentile(chan, high_p, method="higher"))) - except TypeError: - p_lows.append(float(np.percentile(chan, low_p, interpolation="lower"))) - p_highs.append( - float(np.percentile(chan, high_p, interpolation="higher")) - ) + clipped_low_pct.append(100.0 * float(hist[0] + hist[1]) / float(total)) + clipped_high_pct.append(100.0 * float(hist[254] + hist[255]) / float(total)) + p_lows.append(self._u8_percentile_from_hist(hist, low_p, method="lower")) + p_highs.append(self._u8_percentile_from_hist(hist, high_p, method="higher")) # Conservative anchors to avoid new channel clipping p_low = min(p_lows) @@ -1503,6 +1493,152 @@ def auto_levels( ) return blacks, whites, float(p_low), float(p_high) + @staticmethod + def _u8_percentile_from_hist( + hist: np.ndarray, percentile: float, method: str = "lower" + ) -> float: + """Return a discrete uint8 percentile directly from histogram counts.""" + total = int(hist.sum()) + if total <= 0: + return 0.0 + + q = max(0.0, min(100.0, float(percentile))) / 100.0 + rank = (total - 1) * q + if method == "higher": + target_index = math.ceil(rank) + else: + target_index = math.floor(rank) + + cdf = np.cumsum(hist) + value = int(np.searchsorted(cdf, target_index + 1, side="left")) + return float(max(0, min(255, value))) + + def estimate_auto_white_balance( + self, + *, + strength: float = 0.7, + warm_bias: int = 6, + tint_bias: int = 0, + luma_lower_bound: int = 30, + luma_upper_bound: int = 220, + rgb_lower_bound: int = 5, + rgb_upper_bound: int = 250, + target_pixels: int = 600_000, + ) -> Optional[Dict[str, float]]: + """Estimate white-balance sliders from a robust preview-sized sample.""" + _debug = log.isEnabledFor(logging.DEBUG) + if _debug: + t0 = time.perf_counter() + + img_arr = ( + self.float_preview if self.float_preview is not None else self.float_image + ) + if img_arr is None: + if self.original_image is None: + return None + img_arr = ( + np.asarray(self.original_image.convert("RGB"), dtype=np.float32) / 255.0 + ) + + h, w = img_arr.shape[:2] + total_pixels = max(1, h * w) + stride = max(1, int(math.sqrt(total_pixels / max(1, target_pixels)))) + srgb = np.ascontiguousarray(np.clip(img_arr[::stride, ::stride], 0.0, 1.0)) + + rgb_low = max(0.0, min(255.0, float(rgb_lower_bound))) / 255.0 + rgb_high = max(0.0, min(255.0, float(rgb_upper_bound))) / 255.0 + luma_low = max(0.0, min(255.0, float(luma_lower_bound))) / 255.0 + luma_high = max(0.0, min(255.0, float(luma_upper_bound))) / 255.0 + + mask = np.all(srgb > rgb_low, axis=2) & np.all(srgb < rgb_high, axis=2) + luma = 0.2126 * srgb[:, :, 0] + 0.7152 * srgb[:, :, 1] + 0.0722 * srgb[:, :, 2] + mask &= (luma > luma_low) & (luma < luma_high) + + if not np.any(mask): + return None + + spread = np.max(srgb, axis=2) - np.min(srgb, axis=2) + chroma_ratio = spread / np.maximum(luma, 1.0 / 255.0) + valid_ratio = chroma_ratio[mask] + + neutrality_limit = 0.18 + if valid_ratio.size >= 128: + try: + neutrality_limit = float( + np.percentile(valid_ratio, 35.0, method="linear") + ) + except TypeError: + neutrality_limit = float( + np.percentile(valid_ratio, 35.0, interpolation="linear") + ) + neutrality_limit = float(np.clip(neutrality_limit, 0.03, 0.18)) + + neutral_mask = mask & (chroma_ratio <= neutrality_limit) + if np.count_nonzero(neutral_mask) >= 128: + mask = neutral_mask + + midtone_weight = 1.0 - np.abs(luma - 0.5) / 0.5 + midtone_weight = np.clip(midtone_weight, 0.1, 1.0) + weights = midtone_weight / np.maximum(chroma_ratio + 0.02, 0.02) + weights = np.clip(weights, 0.0, 20.0) + + selected = mask & np.isfinite(weights) + selected_count = int(np.count_nonzero(selected)) + if selected_count == 0: + return None + + sample_srgb = srgb[selected] + sample_weights = weights[selected].astype(np.float32, copy=False) + sample_linear = _srgb_to_linear(sample_srgb).astype(np.float32, copy=False) + if not np.isfinite(sample_linear).all(): + return None + + r_mean = float(np.average(sample_linear[:, 0], weights=sample_weights)) + g_mean = float(np.average(sample_linear[:, 1], weights=sample_weights)) + b_mean = float(np.average(sample_linear[:, 2], weights=sample_weights)) + + eps = 1e-6 + ratio_rb = b_mean / max(r_mean, eps) + by_raw = 2.0 * (ratio_rb - 1.0) / max(ratio_rb + 1.0, eps) + + rb_target = 2.0 * r_mean * b_mean / max(r_mean + b_mean, eps) + g_gain_target = rb_target / max(g_mean, eps) + mg_raw = 2.0 * (1.0 - g_gain_target) + + by_value = (by_raw + (float(warm_bias) / 128.0)) * float(strength) + mg_value = (mg_raw + (float(tint_bias) / 128.0)) * float(strength) + + by_value = float(np.clip(by_value, -1.0, 1.0)) + mg_value = float(np.clip(mg_value, -1.0, 1.0)) + + if _debug: + t_end = time.perf_counter() + log.debug( + "[AUTO_WB_EST] total=%dms sample=%dx%d stride=%d selected=%d neutral<=%.3f means=(%.4f, %.4f, %.4f) wb=(%.4f, %.4f)", + int((t_end - t0) * 1000), + srgb.shape[1], + srgb.shape[0], + stride, + selected_count, + neutrality_limit, + r_mean, + g_mean, + b_mean, + by_value, + mg_value, + ) + + return { + "by_value": by_value, + "mg_value": mg_value, + "r_mean": r_mean, + "g_mean": g_mean, + "b_mean": b_mean, + "selected_pixels": float(selected_count), + "stride": float(stride), + "neutrality_limit": neutrality_limit, + } + def _get_upstream_edits_hash(self, edits: Dict[str, Any]) -> int: """Returns a hash of edit parameters that affect the input to highlight recovery.""" # Parameters that affect the image BEFORE highlight recovery: @@ -2268,6 +2404,55 @@ def save_image( ) return self.save_from_snapshot(snapshot) + def _save_u8_pil_image( + self, img_u8: Image.Image, original_path: Path, log_prefix: str + ) -> Optional[Tuple[Path, Path]]: + """Save a prepared uint8 RGB image with backup, EXIF, and best-effort atomic replace.""" + try: + original_stat = original_path.stat() + except OSError: + original_stat = None + + backup_path = create_backup_file(original_path) + if backup_path is None: + return None + + exif_bytes = self._get_sanitized_exif_bytes() + save_kwargs = {"quality": 95} + if exif_bytes: + save_kwargs["exif"] = exif_bytes + + tmp_path = original_path.with_name( + f"{original_path.stem}.__faststack_tmp__{uuid.uuid4().hex}{original_path.suffix}" + ) + try: + try: + img_u8.save(tmp_path, **save_kwargs) + except Exception: + img_u8.save(tmp_path, quality=95) + try: + os.replace(tmp_path, original_path) + except OSError as e: + log.warning( + "Atomic replace failed (%s); falling back to direct save", e + ) + try: + img_u8.save(original_path, **save_kwargs) + except Exception: + img_u8.save(original_path, quality=95) + finally: + try: + if tmp_path.exists(): + tmp_path.unlink() + except OSError: + pass + + if original_stat is not None: + self._restore_file_times(original_path, original_stat) + + log.debug("%s saved via uint8 fast path: %s", log_prefix, original_path.name) + return original_path, backup_path + def save_image_uint8_levels( self, save_target_path: Optional[Path] = None, @@ -2355,70 +2540,122 @@ def save_image_uint8_levels( if _debug: t_lut = time.perf_counter() + save_result = self._save_u8_pil_image( + img_u8, original_path, log_prefix="[SAVE_IMAGE_U8_LEVELS]" + ) + if save_result is None: + return None + + if _debug: + t_write = time.perf_counter() + w, h = img_u8.size + log.debug( + "[SAVE_IMAGE_U8] lut+apply=%dms save+write=%dms total=%dms (%dx%d, %s)", + int((t_lut - t0) * 1000), + int((t_write - t_lut) * 1000), + int((t_write - t0) * 1000), + w, + h, + original_path.name, + ) + return save_result + + def save_image_uint8_white_balance( + self, + save_target_path: Optional[Path] = None, + ) -> Optional[Tuple[Path, Path]]: + """Fast-path save using per-channel LUTs for white-balance-only edits.""" + if self.original_image is None or self.current_filepath is None: + return None + + original_path = save_target_path if save_target_path else self.current_filepath + if original_path.suffix.lower() in (".tif", ".tiff"): + return None + + with self._lock: + edits = self.current_edits.copy() + + for key, default in self._initial_edits().items(): + if key in ("white_balance_by", "white_balance_mg"): + continue + val = edits.get(key, default) + if isinstance(default, float): + try: + if abs(float(val) - float(default)) > 0.001: + return None + except (TypeError, ValueError): + return None + elif val != default: + return None + try: - original_stat = original_path.stat() - except OSError: - original_stat = None + by = float(edits.get("white_balance_by", 0.0)) + mg = float(edits.get("white_balance_mg", 0.0)) + except (TypeError, ValueError): + return None - # Backup - backup_path = create_backup_file(original_path) - if backup_path is None: + if abs(by) <= 0.001 and abs(mg) <= 0.001: return None + _debug = log.isEnabledFor(logging.DEBUG) if _debug: - t_backup = time.perf_counter() + t0 = time.perf_counter() - # EXIF - exif_bytes = self._get_sanitized_exif_bytes() - save_kwargs = {"quality": 95} - if exif_bytes: - save_kwargs["exif"] = exif_bytes + cache_key = (round(by, 3), round(mg, 3)) + with self._lock: + cached = self._cached_u8_wb_lut + if cached is not None and cached[0] == cache_key: + lut_rgb = cached[1] + else: + lut_rgb = None - # Atomic write: temp file + os.replace() to prevent partial-write visibility - tmp_path = original_path.with_name( - f"{original_path.stem}.__faststack_tmp__{uuid.uuid4().hex}{original_path.suffix}" - ) - try: - try: - img_u8.save(tmp_path, **save_kwargs) - except Exception: - # Fallback without EXIF, keep quality - img_u8.save(tmp_path, quality=95) - try: - os.replace(tmp_path, original_path) - except OSError as e: - # Windows: destination may be held open by another process - log.warning( - "Atomic replace failed (%s); falling back to direct save", e - ) - try: - img_u8.save(original_path, **save_kwargs) - except Exception: - img_u8.save(original_path, quality=95) - finally: - try: - if tmp_path.exists(): - tmp_path.unlink() - except OSError: - pass + if lut_rgb is None: + by_scaled = by * 0.5 + mg_scaled = mg * 0.5 + r_gain = max(0.0, 1.0 + by_scaled) + g_gain = max(0.0, 1.0 - mg_scaled) + b_gain = max(0.0, 1.0 - by_scaled) - if original_stat is not None: - self._restore_file_times(original_path, original_stat) + lut = np.arange(256, dtype=np.float32) / 255.0 + lut_linear = _srgb_to_linear(lut) + lut_r = np.clip(_linear_to_srgb(lut_linear * r_gain), 0.0, 1.0) + lut_g = np.clip(_linear_to_srgb(lut_linear * g_gain), 0.0, 1.0) + lut_b = np.clip(_linear_to_srgb(lut_linear * b_gain), 0.0, 1.0) + lut_rgb = ( + np.rint(lut_r * 255.0).astype(np.uint8).tolist() + + np.rint(lut_g * 255.0).astype(np.uint8).tolist() + + np.rint(lut_b * 255.0).astype(np.uint8).tolist() + ) + with self._lock: + self._cached_u8_wb_lut = (cache_key, lut_rgb) + + rgb_img = self.original_image + if rgb_img.mode != "RGB": + rgb_img = rgb_img.convert("RGB") + img_u8 = rgb_img.point(lut_rgb) + + if _debug: + t_lut = time.perf_counter() + + save_result = self._save_u8_pil_image( + img_u8, original_path, log_prefix="[SAVE_IMAGE_U8_WB]" + ) + if save_result is None: + return None if _debug: t_write = time.perf_counter() w, h = img_u8.size log.debug( - "[SAVE_IMAGE_U8] lut+apply=%dms backup=%dms write=%dms total=%dms (%dx%d, %s)", + "[SAVE_IMAGE_U8_WB] lut+apply=%dms write=%dms total=%dms (%dx%d, %s)", int((t_lut - t0) * 1000), - int((t_backup - t_lut) * 1000), - int((t_write - t_backup) * 1000), + int((t_write - t_lut) * 1000), int((t_write - t0) * 1000), w, h, original_path.name, ) - return original_path, backup_path + return save_result def _restore_file_times(self, path: Path, original_stat: os.stat_result) -> None: """Best-effort restoration of access/modify timestamps after saving.""" diff --git a/faststack/qml/Components.qml b/faststack/qml/Components.qml index 643c101..2e353ff 100644 --- a/faststack/qml/Components.qml +++ b/faststack/qml/Components.qml @@ -52,6 +52,12 @@ Item { if (uiState && uiState.isCropping) { loupeView.freezeCropImageSource() } else { + if (mainMouseArea) { + mainMouseArea.clearPendingRotation(0) + mainMouseArea.endCropInteraction() + mainMouseArea.isRotating = false + mainMouseArea.cropRotation = 0 + } loupeView.releaseCropImageSource() } } @@ -62,15 +68,18 @@ Item { if (mainMouseArea.isRotating) { // Revert rotation mainMouseArea.cropRotation = mainMouseArea.cropStartRotation + mainMouseArea.clearPendingRotation(mainMouseArea.cropRotation) if (controller) controller.set_straighten_angle(mainMouseArea.cropRotation, -1) mainMouseArea.endCropInteraction() mainMouseArea.isRotating = false event.accepted = true } else if (controller) { + mainMouseArea.clearPendingRotation(0) mainMouseArea.endCropInteraction() controller.cancel_crop_mode() mainMouseArea.cropRotation = 0 // Reset local rotation + mainMouseArea.isRotating = false event.accepted = true } } @@ -643,6 +652,12 @@ Item { } } + function clearPendingRotation(targetRotation) { + rotationThrottleTimer.stop() + pendingRotation = (targetRotation === undefined) ? 0 : targetRotation + pendingAspect = -1 + } + function setCropBoxStart(left, top, right, bottom) { cropBoxStartLeft = left cropBoxStartTop = top @@ -656,11 +671,13 @@ Item { } function beginNewCrop(mouseX, mouseY, mx, my) { + var clampedMx = Math.max(0, Math.min(1000, mx)) + var clampedMy = Math.max(0, Math.min(1000, my)) cropDragMode = "new" cropStartX = mouseX cropStartY = mouseY - setCropBoxStart(mx, my, mx, my) - uiState.currentCropBox = [Math.round(mx), Math.round(my), Math.round(mx), Math.round(my)] + setCropBoxStart(clampedMx, clampedMy, clampedMx, clampedMy) + uiState.currentCropBox = [Math.round(clampedMx), Math.round(clampedMy), Math.round(clampedMx), Math.round(clampedMy)] } function beginCropInteraction() { diff --git a/faststack/tests/test_auto_white_balance.py b/faststack/tests/test_auto_white_balance.py new file mode 100644 index 0000000..d61f419 --- /dev/null +++ b/faststack/tests/test_auto_white_balance.py @@ -0,0 +1,93 @@ +import numpy as np +from PIL import Image + +from faststack.imaging.editor import ImageEditor + + +def _channel_spread(arr: np.ndarray) -> tuple[float, np.ndarray]: + means = arr.reshape(-1, 3).mean(axis=0) + return float(np.max(means) - np.min(means)), means + + +def test_estimate_auto_white_balance_reduces_mixed_cast(): + editor = ImageEditor() + + base = np.full((180, 180, 3), 0.45, dtype=np.float32) + cast = base.copy() + cast[:, :, 0] *= 0.72 # too little red + cast[:, :, 1] *= 1.12 # too much green + cast[:, :, 2] *= 1.30 # too much blue + + # Distractor patch: strongly coloured but not representative of the neutral field. + cast[:90, :90] = np.array([0.12, 0.82, 0.12], dtype=np.float32) + + editor.float_preview = np.clip(cast, 0.0, 1.0) + + estimate = editor.estimate_auto_white_balance( + strength=1.0, + warm_bias=0, + tint_bias=0, + target_pixels=120_000, + ) + + assert estimate is not None + assert estimate["by_value"] > 0.0 + assert estimate["mg_value"] > 0.0 + + editor.current_edits = editor._initial_edits() + editor.current_edits["white_balance_by"] = estimate["by_value"] + editor.current_edits["white_balance_mg"] = estimate["mg_value"] + + corrected = editor._apply_edits(editor.float_preview.copy()) + + before_spread, before_means = _channel_spread(cast[90:, 90:, :]) + after_spread, after_means = _channel_spread(corrected[90:, 90:, :]) + + assert after_spread < before_spread * 0.45 + assert abs(after_means[0] - after_means[2]) < abs(before_means[0] - before_means[2]) + assert abs(after_means[1] - after_means[0]) < abs(before_means[1] - before_means[0]) + + +def test_estimate_auto_white_balance_leaves_neutral_image_near_zero(): + editor = ImageEditor() + editor.float_preview = np.full((96, 96, 3), 0.5, dtype=np.float32) + + estimate = editor.estimate_auto_white_balance( + strength=1.0, + warm_bias=0, + tint_bias=0, + target_pixels=20_000, + ) + + assert estimate is not None + assert abs(estimate["by_value"]) < 0.02 + assert abs(estimate["mg_value"]) < 0.02 + + +def test_save_image_uint8_white_balance_fast_path(tmp_path): + editor = ImageEditor() + + arr = np.zeros((40, 40, 3), dtype=np.uint8) + arr[:, :, 0] = 90 + arr[:, :, 1] = 120 + arr[:, :, 2] = 180 + + img = Image.fromarray(arr, "RGB") + image_path = tmp_path / "awb-fast.jpg" + img.save(image_path, quality=95) + + editor.original_image = img + editor.current_filepath = image_path + editor.current_edits = editor._initial_edits() + editor.set_edit_param("white_balance_by", 0.4) + + result = editor.save_image_uint8_white_balance() + + assert result is not None + saved_path, backup_path = result + assert saved_path.exists() + assert backup_path.exists() + + saved = np.asarray(Image.open(saved_path).convert("RGB"), dtype=np.float32) + assert saved[:, :, 0].mean() > arr[:, :, 0].mean() + assert saved[:, :, 2].mean() < arr[:, :, 2].mean() diff --git a/faststack/tests/test_editor_lifecycle_and_safety.py b/faststack/tests/test_editor_lifecycle_and_safety.py index 23022ad..38a3ea9 100644 --- a/faststack/tests/test_editor_lifecycle_and_safety.py +++ b/faststack/tests/test_editor_lifecycle_and_safety.py @@ -104,6 +104,19 @@ def test_histogram_worker_submission_safety(self): # This is acceptable behavior: drop the failed frame, wait for next update. self.assertIsNone(self.controller._hist_pending) + def test_switch_to_directory_rebinds_sidecar_instance(self): + """Directory navigation must replace self.sidecar with a new manager.""" + original_sidecar = self.controller.sidecar + next_sidecar = MagicMock() + self.mock_sidecar.side_effect = [next_sidecar] + + with patch.object(self.controller, "load") as mock_load: + self.controller._switch_to_directory(Path("other-folder")) + + self.assertIsNot(self.controller.sidecar, original_sidecar) + self.assertIs(self.controller.sidecar, next_sidecar) + mock_load.assert_called_once_with(skip_thumbnail_refresh=True) + if __name__ == "__main__": unittest.main() diff --git a/faststack/tests/test_editor_reopening.py b/faststack/tests/test_editor_reopening.py index 628020b..1c209e3 100644 --- a/faststack/tests/test_editor_reopening.py +++ b/faststack/tests/test_editor_reopening.py @@ -7,6 +7,7 @@ sys.path.append(str(Path(__file__).parents[2])) from faststack.app import AppController +from faststack.models import EntryMetadata class TestEditorReopening(unittest.TestCase): @@ -340,6 +341,283 @@ def test_save_finished_clears_editor_when_edits_rev_unchanged(self): # Tokens equal → still_on_same_image is True → editor_was_open → clear called self.controller.image_editor.clear.assert_called_once() + def test_save_finished_skips_undo_after_directory_navigation(self): + """A save finishing after folder navigation must not repopulate the + new directory's undo stack, but it must still update the origin sidecar.""" + main_path = Path("/folder-a/test.jpg") + saved_path = Path("/folder-a/test-developed.jpg") + backup_path = Path("/folder-a/test-developed-backup.jpg") + origin_sidecar = MagicMock() + replacement_sidecar = MagicMock() + origin_sidecar.get_metadata.return_value = EntryMetadata(favorite=True) + self.controller.image_files[0].path = main_path + + # Simulate the controller having navigated to another folder before the + # save callback fires. + self.controller.image_dir = Path("/folder-b") + self.controller.sidecar = replacement_sidecar + + save_result = { + "success": True, + "result": (saved_path, backup_path), + "target": self.controller._key(saved_path), + "save_image_key": self.controller._key(Path("test.jpg")), + "editor_was_open": False, + "save_directory_key": self.controller._key(Path("/folder-a")), + "save_metadata_path": str(main_path), + "started_from_restore_override": False, + "save_sidecar": origin_sidecar, + } + + with patch.object(self.controller, "refresh_image_list"): + with patch.object(self.controller, "_reindex_after_save"): + self.controller._on_save_finished(save_result) + + origin_sidecar.update_metadata.assert_called_once() + update_path, update_data = origin_sidecar.update_metadata.call_args.args + self.assertEqual(update_path, main_path) + self.assertTrue(update_data["edited"]) + self.assertTrue(update_data["edited_date"]) + self.assertTrue(update_data["favorite"]) + replacement_sidecar.update_metadata.assert_not_called() + self.assertEqual(self.controller.undo_history, []) + + def test_save_finished_records_undo_in_active_directory(self): + main_path = Path("/folder-a/test.jpg") + saved_path = Path("/folder-a/test-developed.jpg") + backup_path = Path("/folder-a/test-developed-backup.jpg") + origin_sidecar = MagicMock() + origin_sidecar.get_metadata.return_value = EntryMetadata(favorite=True) + self.controller.image_dir = Path("/folder-a") + self.controller.sidecar = origin_sidecar + self.controller.image_files[0].path = main_path + + save_result = { + "success": True, + "result": (saved_path, backup_path), + "target": self.controller._key(saved_path), + "save_image_key": self.controller._key(Path("test.jpg")), + "editor_was_open": False, + "save_directory_key": self.controller._key(Path("/folder-a")), + "save_metadata_path": str(main_path), + "started_from_restore_override": False, + "save_sidecar": origin_sidecar, + } + + with patch.object(self.controller, "refresh_image_list"): + with patch.object(self.controller, "_reindex_after_save"): + self.controller._on_save_finished(save_result) + + action_type, action_data, _ = self.controller.undo_history[-1] + self.assertEqual(action_type, "save_edit") + self.assertEqual(action_data["saved_path"], str(saved_path)) + self.assertEqual(action_data["backup_path"], str(backup_path)) + self.assertEqual(action_data["metadata_path"], str(main_path)) + self.assertEqual( + action_data["metadata_before"], + { + "stack_id": None, + "stacked": False, + "stacked_date": None, + "uploaded": False, + "uploaded_date": None, + "edited": False, + "edited_date": None, + "restacked": False, + "restacked_date": None, + "favorite": True, + "todo": False, + "todo_date": None, + }, + ) + self.assertIs(action_data["sidecar"], origin_sidecar) + origin_sidecar.update_metadata.assert_called_once() + update_path, _ = origin_sidecar.update_metadata.call_args.args + self.assertEqual(update_path, main_path) + + def test_save_finished_malformed_result_clears_saving_status_message(self): + target = self.controller._key(Path("test.jpg")) + self.controller._saves_in_flight = {target} + self.controller._saving_keys = {target} + self.controller.ui_state.isSaving = True + + with patch.object(self.controller, "update_status_message") as mock_status: + self.controller._on_save_finished( + { + "success": True, + "result": "bad-payload", + "target": target, + "save_image_key": target, + } + ) + + self.assertFalse(self.controller.ui_state.isSaving) + mock_status.assert_called_once_with( + "Save finished, but the result payload was malformed.", timeout=5000 + ) + + def test_save_finished_oversized_tuple_uses_malformed_result_cleanup(self): + target = self.controller._key(Path("test.jpg")) + self.controller._saves_in_flight = {target} + self.controller._saving_keys = {target} + self.controller.ui_state.isSaving = True + + with patch.object(self.controller, "update_status_message") as mock_status: + self.controller._on_save_finished( + { + "success": True, + "result": ("saved.jpg", "backup.jpg", "extra"), + "target": target, + "save_image_key": target, + } + ) + + self.assertFalse(self.controller.ui_state.isSaving) + mock_status.assert_called_once_with( + "Save finished, but the result payload was malformed.", timeout=5000 + ) + + def test_undo_save_edit_restores_only_owned_metadata_fields(self): + main_path = Path("/folder-a/test.jpg") + saved_path = Path("/folder-a/test-developed.jpg") + backup_path = Path("/folder-a/test-developed-backup.jpg") + metadata_sidecar = MagicMock() + metadata_sidecar.data.entries = { + "test": EntryMetadata( + favorite=True, + edited=True, + edited_date="2026-04-12T12:34:56", + ) + } + metadata_sidecar.metadata_key_for_path.return_value = "test" + + self.controller.undo_history = [ + ( + "save_edit", + { + "saved_path": str(saved_path), + "backup_path": str(backup_path), + "metadata_path": str(main_path), + "metadata_before": {"edited": False, "edited_date": None}, + "sidecar": metadata_sidecar, + }, + 123.0, + ) + ] + + with patch.object(self.controller, "_restore_backup_safe", return_value=True): + with patch.object(self.controller, "_post_undo_refresh_and_select"): + self.controller.undo_delete() + + metadata_sidecar.metadata_key_for_path.assert_called_once_with(main_path) + restored_meta = metadata_sidecar.data.entries["test"] + self.assertTrue(restored_meta.favorite) + self.assertFalse(restored_meta.edited) + self.assertIsNone(restored_meta.edited_date) + metadata_sidecar.save.assert_called_once() + + def test_undo_save_edit_deletes_default_only_edit_metadata_entry(self): + main_path = Path("/folder-a/test.jpg") + saved_path = Path("/folder-a/test-developed.jpg") + backup_path = Path("/folder-a/test-developed-backup.jpg") + metadata_sidecar = MagicMock() + metadata_sidecar.data.entries = { + "test": EntryMetadata( + edited=True, + edited_date="2026-04-12T12:34:56", + ) + } + metadata_sidecar.metadata_key_for_path.return_value = "test" + + self.controller.undo_history = [ + ( + "save_edit", + { + "saved_path": str(saved_path), + "backup_path": str(backup_path), + "metadata_path": str(main_path), + "metadata_before": None, + "sidecar": metadata_sidecar, + }, + 123.0, + ) + ] + + with patch.object(self.controller, "_restore_backup_safe", return_value=True): + with patch.object(self.controller, "_post_undo_refresh_and_select"): + self.controller.undo_delete() + + metadata_sidecar.metadata_key_for_path.assert_called_once_with(main_path) + self.assertNotIn("test", metadata_sidecar.data.entries) + metadata_sidecar.save.assert_called_once() + + def test_undo_save_edit_propagates_unexpected_restore_errors(self): + saved_path = Path("/folder-a/test-developed.jpg") + backup_path = Path("/folder-a/test-developed-backup.jpg") + + self.controller.undo_history = [ + ( + "save_edit", + { + "saved_path": str(saved_path), + "backup_path": str(backup_path), + "sidecar": MagicMock(), + }, + 123.0, + ) + ] + + with patch.object(self.controller, "_restore_backup_safe", return_value=True): + with patch.object( + self.controller, + "_restore_metadata_snapshot", + side_effect=RuntimeError("unexpected bug"), + ): + with self.assertRaises(RuntimeError): + self.controller.undo_delete() + + def test_auto_white_balance_lab_tolerates_missing_estimate_diagnostics(self): + self.controller.image_editor.original_image = MagicMock() + self.controller.image_editor.estimate_auto_white_balance.return_value = { + "by_value": 0.25, + "mg_value": -0.1, + "r_mean": 0.40, + "g_mean": 0.42, + "b_mean": 0.38, + } + + with patch("faststack.app.config.getfloat", return_value=0.7): + with patch("faststack.app.config.getint", return_value=0): + with patch.object(self.controller, "_kick_preview_worker"): + message = self.controller.auto_white_balance_lab() + + self.assertIn("AWB:", message) + self.controller.image_editor.set_edit_param.assert_any_call( + "white_balance_by", 0.25 + ) + self.controller.image_editor.set_edit_param.assert_any_call( + "white_balance_mg", -0.1 + ) + + def test_quick_awb_loads_viewed_developed_variant(self): + main_path = Path("/folder-a/test.jpg") + developed_path = Path("/folder-a/test-developed.jpg") + self.controller.image_files[0].path = main_path + self.controller.view_override_path = str(developed_path) + self.controller.view_override_kind = "developed" + self.controller.image_editor.current_filepath = main_path + self.controller.image_editor.load_image.return_value = True + + with patch.object( + self.controller, "get_decoded_image", return_value=MagicMock() + ): + with patch.object(self.controller, "auto_white_balance", return_value=None): + self.controller.quick_auto_white_balance() + + self.controller.image_editor.load_image.assert_called_once_with( + str(developed_path), cached_preview=unittest.mock.ANY, preview_only=True + ) + if __name__ == "__main__": unittest.main()