From a4ee99ddb99f8de9617b5f11b65c24d2452f4874 Mon Sep 17 00:00:00 2001 From: Fredrik Reveny Date: Thu, 26 Feb 2026 06:30:08 +0100 Subject: [PATCH 1/3] refactor: extract models.py, ui.py, docx_utils.py from oversized index.py - Extract shared model utilities (_get_cached_model_path, resolve_flashrank_model_name, configure_offline_mode) into models.py, eliminating duplication between index.py and search.py - Extract IndexingUI, FileProcessingContext, FileProcessingTimeoutError, GracefulAbort into ui.py (~560 lines) - Extract DOCX processing (_parse_heading_level, _get_doc_temp_dir, _convert_doc_to_docx, split_docx_into_heading_documents) into docx_utils.py (~250 lines) - Modernize type annotations in index.py to Python 3.11+ syntax (list[], dict[], X | None) - Add ruff linter configuration to pyproject.toml index.py drops from 2,655 to 1,720 lines (-35%). Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 11 + src/chunksilo/docx_utils.py | 275 ++++++++++ src/chunksilo/index.py | 1015 ++--------------------------------- src/chunksilo/models.py | 75 +++ src/chunksilo/search.py | 56 +- src/chunksilo/ui.py | 639 ++++++++++++++++++++++ test/test_indexing_ui.py | 16 +- 7 files changed, 1055 insertions(+), 1032 deletions(-) create mode 100644 src/chunksilo/docx_utils.py create mode 100644 src/chunksilo/models.py create mode 100644 src/chunksilo/ui.py diff --git a/pyproject.toml b/pyproject.toml index dd8dc7f..66f67f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,3 +42,14 @@ dependencies = {file = ["requirements.txt"]} [tool.setuptools.packages.find] where = ["src"] + +[tool.ruff] +target-version = "py311" +line-length = 120 + +[tool.ruff.lint] +select = ["E", "F", "I", "UP"] +ignore = ["E501"] + +[tool.ruff.lint.isort] +known-first-party = ["chunksilo"] diff --git a/src/chunksilo/docx_utils.py b/src/chunksilo/docx_utils.py new file mode 100644 index 0000000..7e1892c --- /dev/null +++ b/src/chunksilo/docx_utils.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""DOCX/DOC document processing utilities. + +Handles parsing DOCX files into heading-based documents, heading level +extraction, and .doc-to-.docx conversion via LibreOffice. +""" +from __future__ import annotations + +import logging +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING + +from docx import Document +from llama_index.core import Document as LlamaIndexDocument + +from . import cfgload +from .index import EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS, get_heading_store + +if TYPE_CHECKING: + from .ui import FileProcessingContext + +logger = logging.getLogger(__name__) + + +def _parse_heading_level(style_name: str | None) -> int: + """Best-effort extraction of a numeric heading level from a DOCX style name.""" + if not style_name: + return 1 + try: + if "Heading" in style_name: + level_str = style_name.replace("Heading", "").strip() + if level_str: + return int(level_str) + except (ValueError, AttributeError): + pass + return 1 + + +def _get_doc_temp_dir() -> Path: + """Get the temporary directory for .doc conversion, creating it if needed.""" + storage_dir = Path(cfgload.get("storage.storage_dir", "./storage")) + temp_dir = storage_dir / "doc_temp" + temp_dir.mkdir(parents=True, exist_ok=True) + return temp_dir + + +def _convert_doc_to_docx(doc_path: Path, timeout: float = 60) -> Path | None: + """Convert a .doc file to .docx using LibreOffice. + + Args: + doc_path: Path to .doc file + timeout: Timeout in seconds for conversion process + + Returns: + Path to temporary .docx file, or None if conversion fails. + Caller is responsible for cleaning up the temp file. + """ + import shutil + import subprocess + + # Find LibreOffice executable + soffice_paths = [ + "/Applications/LibreOffice.app/Contents/MacOS/soffice", # macOS + "/usr/bin/soffice", # Linux + "/usr/bin/libreoffice", # Linux alternative + "soffice", # Windows (in PATH) + ] + + soffice = None + for path in soffice_paths: + if shutil.which(path): + soffice = path + break + + if not soffice: + logger.warning(f"LibreOffice not found. Cannot convert {doc_path}") + return None + + # Use storage directory for temp files (more reliable space than /tmp) + temp_dir = _get_doc_temp_dir() + + try: + result = subprocess.run( + [soffice, "--headless", "--convert-to", "docx", + "--outdir", str(temp_dir), str(doc_path)], + capture_output=True, + timeout=timeout, + ) + if result.returncode != 0: + logger.warning(f"LibreOffice conversion failed for {doc_path}: {result.stderr}") + return None + + # Find the converted file + docx_name = doc_path.stem + ".docx" + docx_path = temp_dir / docx_name + if docx_path.exists(): + return docx_path + + logger.warning(f"Converted file not found: {docx_path}") + except subprocess.TimeoutExpired: + logger.warning(f"LibreOffice conversion timed out for {doc_path}") + except Exception as e: + logger.warning(f"Error converting {doc_path}: {e}") + + return None + + +def split_docx_into_heading_documents( + docx_path: Path, + ctx: FileProcessingContext | None = None +) -> list[LlamaIndexDocument]: + """Split DOCX into documents by heading with progress updates. + + Args: + docx_path: Path to DOCX file + ctx: Optional processing context for progress updates and timeout + """ + docs: list[LlamaIndexDocument] = [] + + if ctx: + ctx.set_phase("Opening DOCX") + + try: + doc = Document(docx_path) + except Exception as e: + logger.warning(f"Failed to open DOCX {docx_path}: {e}") + return docs + + # Extract file dates from filesystem + stat = docx_path.stat() + creation_date = datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d") + last_modified_date = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d") + + # Try to extract dates from DOCX core properties (more accurate than filesystem) + try: + core_props = doc.core_properties + if core_props.created: + creation_date = core_props.created.strftime("%Y-%m-%d") + if core_props.modified: + last_modified_date = core_props.modified.strftime("%Y-%m-%d") + except Exception: + pass # Fall back to filesystem dates + + # First pass: Extract all headings with positions for hierarchy metadata + if ctx: + ctx.set_phase("Extracting headings") + + all_headings = [] + char_position = 0 + for para in doc.paragraphs: + # Periodically check for timeout during long operations + if ctx and len(all_headings) % 100 == 0: + ctx.set_phase(f"Extracting headings ({len(all_headings)} found)") + + style_name = getattr(para.style, "name", "") or "" + is_heading = ( + style_name.startswith("Heading") + or style_name.startswith("heading") + or "Heading" in style_name + ) + + if is_heading and para.text.strip(): + heading_level = _parse_heading_level(style_name) + all_headings.append({ + "text": para.text.strip(), + "position": char_position, + "level": heading_level + }) + + char_position += len(para.text) + 1 # +1 for newline + + # Store headings separately to avoid metadata size issues during chunking + if ctx: + ctx.set_phase("Storing heading metadata") + get_heading_store().set_headings(str(docx_path), all_headings) + + # Second pass: Split by heading (existing logic) + if ctx: + ctx.set_phase("Splitting into sections") + current_heading: str | None = None + current_level: int | None = None + current_body: list[str] = [] + + def flush_current(): + if not current_heading: + return + text = "\n".join(line for line in current_body if line is not None).strip() + if not text: + return + + # Build hierarchical heading_path by finding parent headings based on level + heading_path = [] + if all_headings: + # Find the index of the current heading in all_headings + current_idx = None + for idx, h in enumerate(all_headings): + if h["text"] == current_heading and h["level"] == current_level: + current_idx = idx + break + + if current_idx is not None: + # Build path by including all parent headings (those with lower level numbers) + # Walk backwards from current heading and include headings with level < current_level + path_headings = [all_headings[current_idx]] # Start with current + for idx in range(current_idx - 1, -1, -1): + h = all_headings[idx] + if h["level"] < path_headings[0]["level"]: + path_headings.insert(0, h) + heading_path = [h["text"] for h in path_headings] + + metadata = { + "file_path": str(docx_path), + "file_name": docx_path.name, + "source": str(docx_path), + "heading": current_heading, + "heading_level": current_level, + "creation_date": creation_date, + "last_modified_date": last_modified_date, + "heading_path": heading_path, # Pre-computed hierarchical path + } + docs.append(LlamaIndexDocument( + text=text, + metadata=metadata, + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + )) + + for para in doc.paragraphs: + style_name = getattr(para.style, "name", "") or "" + is_heading = ( + style_name.startswith("Heading") + or style_name.startswith("heading") + or "Heading" in style_name + ) + + if is_heading and para.text.strip(): + flush_current() + current_heading = para.text.strip() + current_level = _parse_heading_level(style_name) + current_body = [] + else: + if current_heading is not None: + current_body.append(para.text) + + flush_current() + + if not docs: + try: + full_text = "\n".join(p.text for p in doc.paragraphs).strip() + except Exception: + full_text = "" + + if full_text: + metadata = { + "file_path": str(docx_path), + "file_name": docx_path.name, + "source": str(docx_path), + "heading": None, + "heading_level": None, + "creation_date": creation_date, + "last_modified_date": last_modified_date, + } + docs.append(LlamaIndexDocument( + text=full_text, + metadata=metadata, + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + )) + + logger.info( + f"Split DOCX {docx_path} into {len(docs)} heading-based document(s)" + ) + return docs diff --git a/src/chunksilo/index.py b/src/chunksilo/index.py index 4d33ad5..7f6a429 100644 --- a/src/chunksilo/index.py +++ b/src/chunksilo/index.py @@ -11,19 +11,15 @@ import logging import os import queue -import signal import sqlite3 -import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path -from typing import List, Dict, Optional, Iterator, Set, Any, Tuple - -from docx import Document +from collections.abc import Iterator +from typing import Any from llama_index.core import ( VectorStoreIndex, @@ -40,6 +36,9 @@ # Load configuration from config.yaml from . import cfgload from .cfgload import load_config +from .models import _get_cached_model_path, resolve_flashrank_model_name, configure_offline_mode +from .ui import IndexingUI, FileProcessingTimeoutError, FileProcessingContext, GracefulAbort +from .docx_utils import split_docx_into_heading_documents, _convert_doc_to_docx _config = load_config() # Configuration from config.yaml @@ -111,8 +110,8 @@ class DirectoryConfig: """Configuration for a single source directory.""" path: Path enabled: bool = True - include: List[str] = field(default_factory=lambda: DEFAULT_INCLUDE_PATTERNS.copy()) - exclude: List[str] = field(default_factory=lambda: DEFAULT_EXCLUDE_PATTERNS.copy()) + include: list[str] = field(default_factory=lambda: DEFAULT_INCLUDE_PATTERNS.copy()) + exclude: list[str] = field(default_factory=lambda: DEFAULT_EXCLUDE_PATTERNS.copy()) recursive: bool = True case_sensitive: bool = False @@ -120,7 +119,7 @@ class DirectoryConfig: @dataclass class IndexConfig: """Complete indexing configuration.""" - directories: List[DirectoryConfig] + directories: list[DirectoryConfig] chunk_size: int = 512 chunk_overlap: int = 50 @@ -159,7 +158,7 @@ def _parse_index_config(config_data: dict) -> IndexConfig: default_case_sensitive = defaults.get("case_sensitive", False) # Parse directories - directories: List[DirectoryConfig] = [] + directories: list[DirectoryConfig] = [] raw_dirs = config_data.get("directories", []) if not raw_dirs: @@ -214,7 +213,7 @@ class HeadingStore: def __init__(self, store_path: Path): self.store_path = store_path - self._data: Dict[str, List[dict]] = {} + self._data: dict[str, list[dict]] = {} self._dirty = False # Track if data needs saving self._lock = threading.Lock() # Thread safety for parallel file loading self._load() @@ -235,13 +234,13 @@ def _save(self): with open(self.store_path, "w", encoding="utf-8") as f: json.dump(self._data, f) - def set_headings(self, file_path: str, headings: List[dict]): + def set_headings(self, file_path: str, headings: list[dict]): """Store headings for a file (deferred write - call flush() to persist).""" with self._lock: self._data[file_path] = headings self._dirty = True - def get_headings(self, file_path: str) -> List[dict]: + def get_headings(self, file_path: str) -> list[dict]: """Get headings for a file.""" with self._lock: return self._data.get(file_path, []) @@ -270,7 +269,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): # Module-level heading store instance (lazy initialized) -_heading_store: Optional["HeadingStore"] = None +_heading_store: "HeadingStore | None" = None def get_heading_store() -> HeadingStore: @@ -328,7 +327,7 @@ def _init_db(self): conn.execute("ALTER TABLE files ADD COLUMN source_dir TEXT DEFAULT ''") logger.info("Migrated files table: added source_dir column") - def get_all_files(self) -> Dict[str, dict]: + def get_all_files(self) -> dict[str, dict]: """Retrieve all tracked files and their metadata.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( @@ -344,7 +343,7 @@ def get_all_files(self) -> Dict[str, dict]: for row in cursor } - def update_file_state(self, file_info: FileInfo, doc_ids: List[str]): + def update_file_state(self, file_info: FileInfo, doc_ids: list[str]): """Update or insert the state for a file.""" with sqlite3.connect(self.db_path) as conn: conn.execute( @@ -366,7 +365,7 @@ def update_file_state(self, file_info: FileInfo, doc_ids: List[str]): ), ) - def update_file_states_batch(self, file_updates: List[tuple[FileInfo, List[str]]]): + def update_file_states_batch(self, file_updates: list[tuple[FileInfo, list[str]]]): """Update multiple files in a single transaction (batch operation). Args: @@ -408,7 +407,7 @@ class DataSource(ABC): """Abstract base class for data sources.""" @abstractmethod - def iter_files(self, tracked_files: Dict[str, dict] | None = None) -> Iterator[FileInfo]: + def iter_files(self, tracked_files: dict[str, dict] | None = None) -> Iterator[FileInfo]: """Yield FileInfo for each file in the source. Args: @@ -422,7 +421,7 @@ def load_file( self, file_info: FileInfo, ctx: "FileProcessingContext | None" = None - ) -> List[LlamaIndexDocument]: + ) -> list[LlamaIndexDocument]: """Load and return documents for a given file. Args: @@ -432,7 +431,7 @@ def load_file( pass -def _compute_line_offsets(text: str) -> List[int]: +def _compute_line_offsets(text: str) -> list[int]: """Compute character offset positions for each line start. Returns a list where line_offsets[i] is the character position where line i+1 starts. @@ -445,7 +444,7 @@ def _compute_line_offsets(text: str) -> List[int]: return offsets -def _extract_markdown_headings(text: str) -> List[dict]: +def _extract_markdown_headings(text: str) -> list[dict]: """Extract heading hierarchy from Markdown text using ATX-style syntax. Parses # Heading syntax and returns list of dicts with text, position, level. @@ -492,7 +491,7 @@ def _extract_pdf_headings_from_outline( pdf_path: Path, timeout_event: threading.Event | None = None, max_seconds: float = 60.0, -) -> List[dict]: +) -> list[dict]: """Extract headings from PDF outline/bookmarks (TOC). Returns list of dicts with text, position (estimated), level. @@ -536,7 +535,7 @@ def flatten_outline(items, level=1): headings = [] start_time = time.time() # Cache page text lengths to avoid redundant extract_text() calls - page_text_lengths: Dict[int, int] = {} + page_text_lengths: dict[int, int] = {} for title, page_num, level in flat: # Check for timeout @@ -732,7 +731,7 @@ def _producer(): raise item yield item - def iter_files(self, tracked_files: Dict[str, dict] | None = None) -> Iterator[FileInfo]: + def iter_files(self, tracked_files: dict[str, dict] | None = None) -> Iterator[FileInfo]: """Yield FileInfo for each matching file in the source. Args: @@ -776,7 +775,7 @@ def iter_files(self, tracked_files: Dict[str, dict] | None = None) -> Iterator[F def _create_file_info( self, file_path: Path, - tracked_files: Dict[str, dict] | None = None, + tracked_files: dict[str, dict] | None = None, ) -> FileInfo: """Create FileInfo with timeout protection against stalled mounts. @@ -800,7 +799,7 @@ def _create_file_info( def _create_file_info_inner( self, file_path: Path, - tracked_files: Dict[str, dict] | None = None, + tracked_files: dict[str, dict] | None = None, ) -> FileInfo: """Create FileInfo with source directory context. @@ -840,7 +839,7 @@ def load_file( self, file_info: FileInfo, ctx: "FileProcessingContext | None" = None - ) -> List[LlamaIndexDocument]: + ) -> list[LlamaIndexDocument]: file_path = Path(file_info.path) exists_timeout = cfgload.get("indexing.timeout.scan_item_seconds", 30) exists_result = _run_with_timeout( @@ -969,8 +968,8 @@ class MultiDirectoryDataSource(DataSource): def __init__(self, config: IndexConfig): self.config = config - self.sources: List[LocalFileSystemSource] = [] - self.unavailable_dirs: List[DirectoryConfig] = [] + self.sources: list[LocalFileSystemSource] = [] + self.unavailable_dirs: list[DirectoryConfig] = [] for dir_config in config.directories: if not dir_config.enabled: @@ -986,9 +985,9 @@ def __init__(self, config: IndexConfig): self.unavailable_dirs.append(dir_config) logger.warning(f"Directory unavailable, skipping: {dir_config.path}") - def iter_files(self, tracked_files: Dict[str, dict] | None = None) -> Iterator[FileInfo]: + def iter_files(self, tracked_files: dict[str, dict] | None = None) -> Iterator[FileInfo]: """Iterate over files from all available sources.""" - seen_paths: Set[str] = set() + seen_paths: set[str] = set() for source in self.sources: for file_info in source.iter_files(tracked_files=tracked_files): @@ -1001,7 +1000,7 @@ def load_file( self, file_info: FileInfo, ctx: "FileProcessingContext | None" = None - ) -> List[LlamaIndexDocument]: + ) -> list[LlamaIndexDocument]: """Load file using the appropriate source based on source_dir.""" # Find the source that owns this file for source in self.sources: @@ -1014,7 +1013,7 @@ def load_file( raise ValueError(f"No source available for file: {file_info.path}") - def get_summary(self) -> Dict[str, Any]: + def get_summary(self) -> dict[str, Any]: """Return summary of configured directories.""" return { "available": [str(s.base_dir) for s in self.sources], @@ -1023,587 +1022,6 @@ def get_summary(self) -> Dict[str, Any]: } -class _DevNull: - """Minimal file-like sink that discards all writes.""" - - def write(self, _data: str) -> int: - return 0 - - def flush(self) -> None: - pass - - def isatty(self) -> bool: - return False - - -class IndexingUI: - """Unified terminal output for the indexing pipeline. - - Owns all stdout writes during build_index(). Provides two display modes: - - Step mode: "message... done" with animated spinner - - Progress mode: progress bar with optional sub-line for current file - - All methods are thread-safe via a single lock. - """ - - _SPINNER_CHARS = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - - def __init__(self, stream=None, verbose=False): - self._stream = stream or sys.stdout - self._verbose = verbose - self._lock = threading.Lock() - - # ANSI color codes (disabled when stream is not a TTY) - _s = self._stream - self._tty = hasattr(_s, "isatty") and _s.isatty() - self.RESET = "\033[0m" if self._tty else "" - self.BOLD = "\033[1m" if self._tty else "" - self.DIM = "\033[2m" if self._tty else "" - self.GREEN = "\033[32m" if self._tty else "" - self.YELLOW = "\033[33m" if self._tty else "" - self.CYAN = "\033[36m" if self._tty else "" - self.RED = "\033[31m" if self._tty else "" - self.BOLD_GREEN = "\033[1;32m" if self._tty else "" - self.BOLD_CYAN = "\033[1;36m" if self._tty else "" - - # Step/spinner state - self._step_message: str | None = None - self._step_original_message: str | None = None - self._step_stop = threading.Event() - self._step_thread: threading.Thread | None = None - - # Progress bar state - self._progress_active = False - self._progress_paused = False - self._progress_total = 0 - self._progress_current = 0 - self._progress_desc = "" - self._progress_unit = "file" - self._progress_width = 30 - self._progress_file = "" - self._progress_phase = "" - self._progress_heartbeat = "" - self._progress_has_subline = False - self._progress_last_pct = -1 # last printed percentage (non-TTY) - self._progress_substep = "" - self._progress_substep_stop = threading.Event() - self._progress_substep_thread: threading.Thread | None = None - self._progress_substep_idx = 0 - - # Output suppression state (populated by _suppress_output) - self._orig_stdout = None - self._orig_stderr = None - self._orig_handler_levels: list[tuple[logging.Handler, int]] = [] - - # -- context manager -- - - def __enter__(self): - self._suppress_output() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._step_message is not None: - self.step_done("interrupted") - self._step_original_message = None - if self._progress_substep: - self.progress_substep_done() - if self._progress_active or self._progress_paused: - self._progress_active = True - self._progress_paused = False - self.progress_done() - self._restore_output() - return False - - # -- step mode -- - - def step_start(self, message: str) -> None: - """Begin an animated step: prints 'message... ⠋' with spinner.""" - with self._lock: - self._step_message = message - self._step_original_message = message - self._step_stop.clear() - if not self._tty: - self._write(f"{message}... ") - return - self._write(f"\r{self.BOLD}{message}{self.RESET}... ") - self._step_thread = threading.Thread(target=self._step_spin, daemon=True) - self._step_thread.start() - - def step_update(self, message: str) -> None: - """Update the step message while the spinner continues.""" - with self._lock: - if self._step_message is not None: - self._step_message = message - - def step_done(self, suffix: str = "done") -> None: - """Complete the current step: replaces spinner with suffix. - - Uses the original step_start message for the final line so that - dynamic updates (e.g. file counts) don't appear in the completed line. - """ - self._step_stop.set() - if self._step_thread: - self._step_thread.join() - self._step_thread = None - with self._lock: - msg = self._step_original_message or self._step_message or "" - self._step_message = None - self._step_original_message = None - if not self._tty: - self._write(f"{suffix}\n") - return - colored_suffix = self._color_suffix(suffix) - self._write(f"\r{self.BOLD}{msg}{self.RESET}... {colored_suffix}\033[K\n") - - def _step_spin(self) -> None: - idx = 0 - while not self._step_stop.is_set(): - with self._lock: - if self._step_message is not None: - self._write( - f"\r{self.BOLD}{self._step_message}{self.RESET}... " - f"{self.CYAN}{self._SPINNER_CHARS[idx]}{self.RESET}\033[K" - ) - idx = (idx + 1) % len(self._SPINNER_CHARS) - self._step_stop.wait(0.1) - - # -- progress mode -- - - def progress_start(self, total: int, desc: str = "Processing files", unit: str = "file") -> None: - """Enter progress bar mode.""" - with self._lock: - self._progress_active = True - self._progress_paused = False - self._progress_total = max(total, 0) - self._progress_current = 0 - self._progress_desc = desc - self._progress_unit = unit - self._progress_file = "" - self._progress_phase = "" - self._progress_heartbeat = "" - self._progress_has_subline = False - self._progress_last_pct = -1 - if self._progress_total > 0: - self._render_progress() - - def progress_update(self, step: int = 1) -> None: - """Advance the progress bar.""" - with self._lock: - if not self._progress_active or self._progress_total <= 0: - return - self._progress_current = min(self._progress_total, self._progress_current + step) - self._render_progress() - - def progress_set_file(self, file_path: str, phase: str = "") -> None: - """Set current file shown on sub-line under the bar.""" - with self._lock: - if not self._progress_active: - return - self._progress_file = file_path - self._progress_phase = phase - self._render_progress() - - def progress_set_heartbeat(self, char: str) -> None: - """Update heartbeat animation character.""" - with self._lock: - if not self._progress_active: - return - self._progress_heartbeat = char - self._render_progress() - - def progress_pause(self) -> None: - """Temporarily hide the progress bar for step output.""" - with self._lock: - if not self._progress_active: - return - self._clear_progress_area() - self._progress_active = False - self._progress_paused = True - - def progress_resume(self) -> None: - """Re-show the progress bar after a pause.""" - with self._lock: - if not self._progress_paused: - return - self._progress_active = True - self._progress_paused = False - self._progress_has_subline = False - self._render_progress() - - def progress_substep_start(self, message: str) -> None: - """Show a substep message on the progress bar sub-line with spinner.""" - with self._lock: - self._progress_substep = message - self._progress_file = "" - self._progress_phase = "" - self._progress_heartbeat = "" - self._progress_substep_stop.clear() - self._progress_substep_idx = 0 - if not self._tty: - # Print current progress context + substep on one line - pct_int = 0 - if self._progress_total > 0: - pct_int = int(self._progress_current / self._progress_total * 100) - self._stream.write( - f"{self._progress_desc} " - f"[{self._progress_current}/{self._progress_total}] " - f"{pct_int}% \u2014 {message}... " - ) - self._stream.flush() - return - self._render_progress() - self._progress_substep_thread = threading.Thread( - target=self._substep_spin, daemon=True - ) - self._progress_substep_thread.start() - - def progress_substep_done(self) -> None: - """Clear the substep message from the progress bar sub-line.""" - self._progress_substep_stop.set() - if self._progress_substep_thread: - self._progress_substep_thread.join() - self._progress_substep_thread = None - with self._lock: - self._progress_substep = "" - self._progress_heartbeat = "" - if not self._tty: - self._stream.write("done\n") - self._stream.flush() - return - self._render_progress() - - def _substep_spin(self) -> None: - """Animate spinner on the progress sub-line for substep.""" - while not self._progress_substep_stop.is_set(): - with self._lock: - if self._progress_substep: - self._progress_heartbeat = self._SPINNER_CHARS[self._progress_substep_idx] - self._render_progress() - self._progress_substep_idx = (self._progress_substep_idx + 1) % len(self._SPINNER_CHARS) - self._progress_substep_stop.wait(0.1) - - def progress_done(self) -> None: - """Exit progress bar mode.""" - with self._lock: - if not self._progress_active: - return - if not self._tty: - # Print final 100% line if not already printed - if self._progress_last_pct < 100 and self._progress_total > 0: - self._write( - f"{self._progress_desc} " - f"[{self._progress_total}/{self._progress_total}] 100%\n" - ) - else: - # Render final state - self._render_progress() - # Move past the progress area - if self._progress_has_subline: - self._write("\n\n") - else: - self._write("\n") - self._progress_active = False - self._progress_paused = False - self._progress_has_subline = False - - def _render_progress(self) -> None: - """Render progress bar + optional file sub-line. Must hold lock.""" - if self._progress_total <= 0: - return - progress = self._progress_current / self._progress_total - - if not self._tty: - # Non-TTY: print a simple line every 10% to avoid log spam - pct_int = int(progress * 100) - threshold = (pct_int // 10) * 10 - if threshold <= self._progress_last_pct: - return - self._progress_last_pct = threshold - self._stream.write( - f"{self._progress_desc} " - f"[{self._progress_current}/{self._progress_total}] " - f"{pct_int}%\n" - ) - self._stream.flush() - return - - filled = int(self._progress_width * progress) - bar_filled = f"{self.GREEN}{'█' * filled}{self.RESET}" - bar_empty = f"{self.DIM}{'░' * (self._progress_width - filled)}{self.RESET}" - bar = f"{bar_filled}{bar_empty}" - - # Move cursor to start of progress area - if self._progress_has_subline: - self._stream.write("\033[1A\r") - - # Line 1: progress bar - pct = f"{progress * 100:5.1f}%" - if progress >= 1.0: - pct = f"{self.BOLD_GREEN}{pct}{self.RESET}" - line1 = ( - f"{self.BOLD}{self._progress_desc}{self.RESET} [{bar}] " - f"{pct} {self.DIM}({self._progress_current}/{self._progress_total}){self.RESET}" - ) - self._stream.write(f"\r\033[K{line1}") - - # Line 2: substep message (priority) or current file - if self._progress_substep: - subline = f" {self.DIM}{self._progress_substep}{self.RESET}" - if self._progress_heartbeat: - subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}" - self._stream.write(f"\n\033[K{subline}") - self._progress_has_subline = True - elif self._progress_file: - file_display = Path(self._progress_file).name - if len(file_display) > 50: - file_display = "..." + file_display[-47:] - subline = f" {self.DIM}{file_display}{self.RESET}" - if self._progress_phase: - subline += f" {self.DIM}({self._progress_phase}" - if self._progress_heartbeat: - subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}{self.DIM}" - subline += f"){self.RESET}" - elif self._progress_heartbeat: - subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}" - self._stream.write(f"\n\033[K{subline}") - self._progress_has_subline = True - elif self._progress_has_subline: - # Clear stale sub-line - self._stream.write(f"\n\033[K") - - self._stream.flush() - - def _clear_progress_area(self) -> None: - """Clear progress bar lines from terminal. Must hold lock.""" - if not self._tty: - return - if self._progress_has_subline: - self._stream.write("\033[1A\r\033[K\n\033[K\033[1A\r") - else: - self._stream.write("\r\033[K") - self._stream.flush() - self._progress_has_subline = False - - # -- general output -- - - def print(self, message: str) -> None: - """Print a plain text line.""" - with self._lock: - self._write(f"{message}\n") - - def success(self, message: str) -> None: - """Print a success message in bold green.""" - with self._lock: - self._write(f"{self.BOLD_GREEN}{message}{self.RESET}\n") - - def error(self, message: str) -> None: - """Print an error message in red.""" - with self._lock: - self._write(f"{self.RED}{message}{self.RESET}\n") - - # -- internal helpers -- - - def _color_suffix(self, suffix: str) -> str: - """Return a color-coded suffix string for step_done output.""" - s = suffix.lower() - if s in ("done", "no changes"): - return f"{self.GREEN}{suffix}{self.RESET}" - if s in ("skipped",): - return f"{self.YELLOW}{suffix}{self.RESET}" - if s in ("interrupted",): - return f"{self.RED}{suffix}{self.RESET}" - return suffix - - def _write(self, text: str) -> None: - """Write to stream and flush. Caller must hold lock.""" - self._stream.write(text) - self._stream.flush() - - def _suppress_output(self) -> None: - """Redirect stdout/stderr and silence root logger stream handlers. - - IndexingUI captures self._stream at __init__ time, so it keeps - writing to the real terminal. All 3rd-party code that calls - print() or writes to sys.stdout/stderr hits _DevNull instead. - - Skipped when verbose=True to allow full debugging output. - """ - if self._verbose: - return - - devnull = _DevNull() - - self._orig_stdout = sys.stdout - self._orig_stderr = sys.stderr - sys.stdout = devnull - sys.stderr = devnull - - for handler in logging.root.handlers: - if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): - self._orig_handler_levels.append((handler, handler.level)) - handler.setLevel(logging.CRITICAL + 1) - - def _restore_output(self) -> None: - """Restore stdout/stderr and root logger handler levels.""" - if self._orig_stdout is not None: - sys.stdout = self._orig_stdout - self._orig_stdout = None - if self._orig_stderr is not None: - sys.stderr = self._orig_stderr - self._orig_stderr = None - - for handler, level in self._orig_handler_levels: - handler.setLevel(level) - self._orig_handler_levels.clear() - - -class FileProcessingTimeoutError(Exception): - """Raised when file processing exceeds timeout.""" - pass - - -class FileProcessingContext: - """Context manager for file processing with timeout and heartbeat. - - Usage: - with FileProcessingContext(file_path, ui, timeout=300) as ctx: - ctx.set_phase("Converting .doc") - result = process_file() - """ - - def __init__( - self, - file_path: str, - ui: IndexingUI, - timeout_seconds: float | None = None, - heartbeat_interval: float = 2.0 - ): - self.file_path = file_path - self.ui = ui - self.timeout_seconds = timeout_seconds - self.heartbeat_interval = heartbeat_interval - - self._start_time: float | None = None - self._stop_event = threading.Event() - self._timeout_event = threading.Event() - self._heartbeat_thread: threading.Thread | None = None - self._current_phase = "" - - def __enter__(self): - """Start timing and heartbeat thread.""" - self._start_time = time.time() - - # Start heartbeat thread - self._heartbeat_thread = threading.Thread( - target=self._heartbeat_loop, - daemon=True - ) - self._heartbeat_thread.start() - - # Update UI with current file - self.ui.progress_set_file(self.file_path, "") - - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Stop heartbeat and check for slow files.""" - # Stop heartbeat thread - self._stop_event.set() - if self._heartbeat_thread: - self._heartbeat_thread.join(timeout=1.0) - - # Warn about slow files (warnings still pass through) - if self._start_time: - duration = time.time() - self._start_time - slow_threshold = cfgload.get( - "indexing.logging.slow_file_threshold_seconds", 30 - ) - if duration > slow_threshold: - logger.warning( - f"Slow file processing: {self.file_path} took {duration:.1f}s" - ) - - # Don't suppress exceptions - return False - - def set_phase(self, phase: str) -> None: - """Update current operation phase.""" - self._current_phase = phase - self.ui.progress_set_file(self.file_path, phase) - - # Check for timeout - self.check_timeout() - - def check_timeout(self) -> None: - """Check if processing has exceeded timeout (main-thread safe). - - Checks both the _timeout_event (set by heartbeat thread) and - elapsed wall-clock time. Raises FileProcessingTimeoutError if - either indicates a timeout. - """ - if self.timeout_seconds is None or self._start_time is None: - return - - if self._timeout_event.is_set(): - elapsed = time.time() - self._start_time - raise FileProcessingTimeoutError( - f"File processing timed out after {elapsed:.1f}s: {self.file_path}" - ) - - elapsed = time.time() - self._start_time - if elapsed > self.timeout_seconds: - self._timeout_event.set() - raise FileProcessingTimeoutError( - f"File processing timed out after {elapsed:.1f}s: {self.file_path}" - ) - - def remaining_seconds(self) -> float | None: - """Return seconds remaining before timeout, or None if no timeout set.""" - if self.timeout_seconds is None or self._start_time is None: - return None - remaining = self.timeout_seconds - (time.time() - self._start_time) - return max(0.0, remaining) - - def _check_timeout(self) -> None: - """Check if processing has exceeded timeout (used by heartbeat thread).""" - if self.timeout_seconds is None or self._start_time is None: - return - - elapsed = time.time() - self._start_time - if elapsed > self.timeout_seconds: - self._timeout_event.set() - raise FileProcessingTimeoutError( - f"File processing timed out after {elapsed:.1f}s: {self.file_path}" - ) - - def _heartbeat_loop(self) -> None: - """Background thread that updates heartbeat indicator.""" - if not self.ui._tty: - # Non-TTY: only monitor for timeouts, skip animation - while not self._stop_event.is_set(): - try: - self._check_timeout() - except FileProcessingTimeoutError: - break - time.sleep(self.heartbeat_interval) - return - - spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - idx = 0 - - while not self._stop_event.is_set(): - self.ui.progress_set_heartbeat(spinner_chars[idx]) - idx = (idx + 1) % len(spinner_chars) - - # Check for timeout - try: - self._check_timeout() - except FileProcessingTimeoutError: - break - - time.sleep(self.heartbeat_interval) - - def _embedding_cache_path(model_name: str, cache_dir: Path) -> Path: """Return the expected cache directory for a FastEmbed model.""" return cache_dir / f"models--{model_name.replace('/', '--')}" @@ -1646,28 +1064,6 @@ def _verify_model_cache_exists(cache_dir: Path) -> bool: return False -def _get_cached_model_path(cache_dir: Path, model_name: str) -> Path | None: - """Get the cached model directory path.""" - try: - from huggingface_hub import snapshot_download - from fastembed import TextEmbedding - models = TextEmbedding.list_supported_models() - model_info = [m for m in models if m.get("model") == model_name] - if model_info: - hf_source = model_info[0].get("sources", {}).get("hf") - if hf_source: - cache_dir_abs = cache_dir.resolve() - model_dir = snapshot_download( - repo_id=hf_source, - local_files_only=True, - cache_dir=str(cache_dir_abs) - ) - return Path(model_dir).resolve() - except (ImportError, Exception): - pass - return None - - def _create_fastembed_embedding(cache_dir: Path, offline: bool = False): """Create a FastEmbedEmbedding instance.""" # Large batch size so LlamaIndex sends all texts in one call to @@ -1748,23 +1144,7 @@ def ensure_rerank_model_cached(cache_dir: Path, offline: bool = False) -> Path: cache_dir_abs = cache_dir.resolve() logger.info("Ensuring rerank model is available in cache...") - # Map cross-encoder model names to FlashRank equivalents if needed - model_name = RETRIEVAL_RERANK_MODEL_NAME - # Note: FlashRank doesn't have L-6 models, so we map to L-12 equivalents - model_mapping = { - "cross-encoder/ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", # L-6 not available, use L-12 - "ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", # Direct mapping for L-6 - } - if model_name in model_mapping: - model_name = model_mapping[model_name] - elif model_name.startswith("cross-encoder/"): - # Extract model name after cross-encoder/ prefix and try to map - base_name = model_name.replace("cross-encoder/", "") - # If it's an L-6 model, map to L-12 - if "L-6" in base_name: - model_name = base_name.replace("L-6", "L-12") - else: - model_name = base_name + model_name = resolve_flashrank_model_name(RETRIEVAL_RERANK_MODEL_NAME) try: reranker = Ranker(model_name=model_name, cache_dir=str(cache_dir_abs)) @@ -1778,258 +1158,7 @@ def ensure_rerank_model_cached(cache_dir: Path, offline: bool = False) -> Path: raise -def _parse_heading_level(style_name: str | None) -> int: - """Best-effort extraction of a numeric heading level from a DOCX style name.""" - if not style_name: - return 1 - try: - if "Heading" in style_name: - level_str = style_name.replace("Heading", "").strip() - if level_str: - return int(level_str) - except (ValueError, AttributeError): - pass - return 1 - - -def _get_doc_temp_dir() -> Path: - """Get the temporary directory for .doc conversion, creating it if needed.""" - storage_dir = Path(cfgload.get("storage.storage_dir", "./storage")) - temp_dir = storage_dir / "doc_temp" - temp_dir.mkdir(parents=True, exist_ok=True) - return temp_dir - - -def _convert_doc_to_docx(doc_path: Path, timeout: float = 60) -> Path | None: - """Convert a .doc file to .docx using LibreOffice. - - Args: - doc_path: Path to .doc file - timeout: Timeout in seconds for conversion process - - Returns: - Path to temporary .docx file, or None if conversion fails. - Caller is responsible for cleaning up the temp file. - """ - import subprocess - import shutil - - # Find LibreOffice executable - soffice_paths = [ - "/Applications/LibreOffice.app/Contents/MacOS/soffice", # macOS - "/usr/bin/soffice", # Linux - "/usr/bin/libreoffice", # Linux alternative - "soffice", # Windows (in PATH) - ] - - soffice = None - for path in soffice_paths: - if shutil.which(path): - soffice = path - break - - if not soffice: - logger.warning(f"LibreOffice not found. Cannot convert {doc_path}") - return None - - # Use storage directory for temp files (more reliable space than /tmp) - temp_dir = _get_doc_temp_dir() - - try: - result = subprocess.run( - [soffice, "--headless", "--convert-to", "docx", - "--outdir", str(temp_dir), str(doc_path)], - capture_output=True, - timeout=timeout, - ) - if result.returncode != 0: - logger.warning(f"LibreOffice conversion failed for {doc_path}: {result.stderr}") - return None - - # Find the converted file - docx_name = doc_path.stem + ".docx" - docx_path = temp_dir / docx_name - if docx_path.exists(): - return docx_path - - logger.warning(f"Converted file not found: {docx_path}") - except subprocess.TimeoutExpired: - logger.warning(f"LibreOffice conversion timed out for {doc_path}") - except Exception as e: - logger.warning(f"Error converting {doc_path}: {e}") - - return None - - -def split_docx_into_heading_documents( - docx_path: Path, - ctx: "FileProcessingContext | None" = None -) -> List[LlamaIndexDocument]: - """Split DOCX into documents by heading with progress updates. - - Args: - docx_path: Path to DOCX file - ctx: Optional processing context for progress updates and timeout - """ - docs: List[LlamaIndexDocument] = [] - - if ctx: - ctx.set_phase("Opening DOCX") - - try: - doc = Document(docx_path) - except Exception as e: - logger.warning(f"Failed to open DOCX {docx_path}: {e}") - return docs - - # Extract file dates from filesystem - stat = docx_path.stat() - creation_date = datetime.fromtimestamp(stat.st_ctime).strftime("%Y-%m-%d") - last_modified_date = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d") - - # Try to extract dates from DOCX core properties (more accurate than filesystem) - try: - core_props = doc.core_properties - if core_props.created: - creation_date = core_props.created.strftime("%Y-%m-%d") - if core_props.modified: - last_modified_date = core_props.modified.strftime("%Y-%m-%d") - except Exception: - pass # Fall back to filesystem dates - - # First pass: Extract all headings with positions for hierarchy metadata - if ctx: - ctx.set_phase("Extracting headings") - - all_headings = [] - char_position = 0 - for para in doc.paragraphs: - # Periodically check for timeout during long operations - if ctx and len(all_headings) % 100 == 0: - ctx.set_phase(f"Extracting headings ({len(all_headings)} found)") - - style_name = getattr(para.style, "name", "") or "" - is_heading = ( - style_name.startswith("Heading") - or style_name.startswith("heading") - or "Heading" in style_name - ) - - if is_heading and para.text.strip(): - heading_level = _parse_heading_level(style_name) - all_headings.append({ - "text": para.text.strip(), - "position": char_position, - "level": heading_level - }) - - char_position += len(para.text) + 1 # +1 for newline - - # Store headings separately to avoid metadata size issues during chunking - if ctx: - ctx.set_phase("Storing heading metadata") - get_heading_store().set_headings(str(docx_path), all_headings) - - # Second pass: Split by heading (existing logic) - if ctx: - ctx.set_phase("Splitting into sections") - current_heading: str | None = None - current_level: int | None = None - current_body: list[str] = [] - - def flush_current(): - if not current_heading: - return - text = "\n".join(line for line in current_body if line is not None).strip() - if not text: - return - - # Build hierarchical heading_path by finding parent headings based on level - heading_path = [] - if all_headings: - # Find the index of the current heading in all_headings - current_idx = None - for idx, h in enumerate(all_headings): - if h["text"] == current_heading and h["level"] == current_level: - current_idx = idx - break - - if current_idx is not None: - # Build path by including all parent headings (those with lower level numbers) - # Walk backwards from current heading and include headings with level < current_level - path_headings = [all_headings[current_idx]] # Start with current - for idx in range(current_idx - 1, -1, -1): - h = all_headings[idx] - if h["level"] < path_headings[0]["level"]: - path_headings.insert(0, h) - heading_path = [h["text"] for h in path_headings] - - metadata = { - "file_path": str(docx_path), - "file_name": docx_path.name, - "source": str(docx_path), - "heading": current_heading, - "heading_level": current_level, - "creation_date": creation_date, - "last_modified_date": last_modified_date, - "heading_path": heading_path, # Pre-computed hierarchical path - } - docs.append(LlamaIndexDocument( - text=text, - metadata=metadata, - excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, - excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, - )) - - for para in doc.paragraphs: - style_name = getattr(para.style, "name", "") or "" - is_heading = ( - style_name.startswith("Heading") - or style_name.startswith("heading") - or "Heading" in style_name - ) - - if is_heading and para.text.strip(): - flush_current() - current_heading = para.text.strip() - current_level = _parse_heading_level(style_name) - current_body = [] - else: - if current_heading is not None: - current_body.append(para.text) - - flush_current() - - if not docs: - try: - full_text = "\n".join(p.text for p in doc.paragraphs).strip() - except Exception: - full_text = "" - - if full_text: - metadata = { - "file_path": str(docx_path), - "file_name": docx_path.name, - "source": str(docx_path), - "heading": None, - "heading_level": None, - "creation_date": creation_date, - "last_modified_date": last_modified_date, - } - docs.append(LlamaIndexDocument( - text=full_text, - metadata=metadata, - excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, - excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, - )) - - logger.info( - f"Split DOCX {docx_path} into {len(docs)} heading-based document(s)" - ) - return docs - - -def tokenize_filename(filename: str) -> List[str]: +def tokenize_filename(filename: str) -> list[str]: """ Tokenize a filename for BM25 indexing. @@ -2076,7 +1205,7 @@ def build_bm25_index(index, storage_dir: Path) -> None: # Create filename nodes - one per unique file filename_nodes = [] - seen_files: Set[str] = set() + seen_files: set[str] = set() for doc_id, node in index.docstore.docs.items(): metadata = node.metadata or {} @@ -2125,30 +1254,6 @@ def batched(iterable, n): yield batch -def configure_offline_mode(offline: bool, cache_dir: Path) -> None: - """Configure environment variables for offline mode.""" - if offline: - os.environ["HF_HUB_OFFLINE"] = "1" - os.environ["TRANSFORMERS_OFFLINE"] = "1" - os.environ["HF_DATASETS_OFFLINE"] = "1" - cache_dir_abs = cache_dir.resolve() - os.environ["HF_HOME"] = str(cache_dir_abs) - os.environ["HF_HUB_CACHE"] = str(cache_dir_abs) - os.environ["HF_DATASETS_CACHE"] = str(cache_dir_abs) - logger.info("Offline mode enabled.") - else: - # Clear offline mode environment variables to allow downloads - for var in ["HF_HUB_OFFLINE", "TRANSFORMERS_OFFLINE", "HF_DATASETS_OFFLINE"]: - os.environ.pop(var, None) - - # Update huggingface_hub's cached constant (it caches at import time) - try: - from huggingface_hub import constants - constants.HF_HUB_OFFLINE = offline - except ImportError: - pass - - def calculate_optimal_batch_size( num_files: int, avg_chunks_per_file: int = 10, @@ -2185,7 +1290,7 @@ def calculate_optimal_batch_size( def load_files_parallel( - files: List[FileInfo], + files: list[FileInfo], data_source: DataSource, ui: IndexingUI, max_workers: int = 4, @@ -2193,7 +1298,7 @@ def load_files_parallel( per_file_timeout: float = 300, heartbeat_interval: float = 2.0, abort_ctl: "GracefulAbort | None" = None, -) -> Dict[str, Tuple[FileInfo, List[Any]]]: +) -> dict[str, tuple[FileInfo, list[Any]]]: """Load files in parallel using ThreadPoolExecutor. Args: @@ -2207,7 +1312,7 @@ def load_files_parallel( abort_ctl: Optional GracefulAbort instance for Ctrl-C handling Returns: - Dict mapping file path to (FileInfo, List[LlamaIndexDocument]) + Dict mapping file path to (FileInfo, list[LlamaIndexDocument]) """ file_docs = {} @@ -2257,46 +1362,6 @@ def load_single_file(file_info: FileInfo): return file_docs -class GracefulAbort: - """Two-stage Ctrl-C handler for the indexing pipeline. - - First Ctrl-C: sets abort flag and restores default SIGINT so a second - Ctrl-C raises KeyboardInterrupt immediately. - """ - - def __init__(self, ui: "IndexingUI"): - self._abort = False - self._ui = ui - self._original_handler = None - - @property - def abort_requested(self) -> bool: - return self._abort - - def install(self) -> None: - """Register the custom SIGINT handler. Must be called from the main thread.""" - self._original_handler = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self._handle_sigint) - - def uninstall(self) -> None: - """Restore the original SIGINT handler.""" - if self._original_handler is not None: - signal.signal(signal.SIGINT, self._original_handler) - self._original_handler = None - - def _handle_sigint(self, signum, frame): - self._abort = True - signal.signal(signal.SIGINT, signal.SIG_DFL) - self._ui.print( - f"\n{self._ui.YELLOW}Ctrl-C received. " - f"Finishing current batch and saving state...{self._ui.RESET}" - ) - self._ui.print( - f"{self._ui.DIM}(Press Ctrl-C again to force quit immediately)" - f"{self._ui.RESET}" - ) - - def build_index( download_only: bool = False, config_path: Path | None = None, @@ -2400,8 +1465,8 @@ def build_index( # Change Detection tracked_files = ingestion_state.get_all_files() - found_files: Set[str] = set() - files_to_process: List[FileInfo] = [] + found_files: set[str] = set() + files_to_process: list[FileInfo] = [] new_count = 0 modified_count = 0 diff --git a/src/chunksilo/models.py b/src/chunksilo/models.py new file mode 100644 index 0000000..c563834 --- /dev/null +++ b/src/chunksilo/models.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Shared model utilities used by both the indexing and search pipelines.""" +import logging +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def _get_cached_model_path(cache_dir: Path, model_name: str) -> Path | None: + """Get the cached model directory path using huggingface_hub's snapshot_download.""" + try: + from fastembed import TextEmbedding + from huggingface_hub import snapshot_download + models = TextEmbedding.list_supported_models() + model_info = [m for m in models if m.get("model") == model_name] + if model_info: + hf_source = model_info[0].get("sources", {}).get("hf") + if hf_source: + cache_dir_abs = cache_dir.resolve() + model_dir = snapshot_download( + repo_id=hf_source, + local_files_only=True, + cache_dir=str(cache_dir_abs) + ) + return Path(model_dir).resolve() + except (ImportError, Exception): + pass + return None + + +def resolve_flashrank_model_name(model_name: str) -> str: + """Map cross-encoder model names to FlashRank equivalents. + + FlashRank doesn't have L-6 models, so we map to L-12 equivalents. + The cross-encoder/ prefix is also stripped since FlashRank uses bare names. + """ + model_mapping = { + "cross-encoder/ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", + "ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", + } + if model_name in model_mapping: + return model_mapping[model_name] + elif model_name.startswith("cross-encoder/"): + base_name = model_name.replace("cross-encoder/", "") + if "L-6" in base_name: + return base_name.replace("L-6", "L-12") + else: + return base_name + return model_name + + +def configure_offline_mode(offline: bool, cache_dir: Path) -> None: + """Configure environment variables for offline mode.""" + if offline: + os.environ["HF_HUB_OFFLINE"] = "1" + os.environ["TRANSFORMERS_OFFLINE"] = "1" + os.environ["HF_DATASETS_OFFLINE"] = "1" + cache_dir_abs = cache_dir.resolve() + os.environ["HF_HOME"] = str(cache_dir_abs) + os.environ["HF_HUB_CACHE"] = str(cache_dir_abs) + os.environ["HF_DATASETS_CACHE"] = str(cache_dir_abs) + logger.info("Offline mode enabled.") + else: + # Clear offline mode environment variables to allow downloads + for var in ["HF_HUB_OFFLINE", "TRANSFORMERS_OFFLINE", "HF_DATASETS_OFFLINE"]: + os.environ.pop(var, None) + + # Update huggingface_hub's cached constant (it caches at import time) + try: + from huggingface_hub import constants + constants.HF_HUB_OFFLINE = offline + except ImportError: + pass diff --git a/src/chunksilo/search.py b/src/chunksilo/search.py index 8cf3a63..bcbfaff 100644 --- a/src/chunksilo/search.py +++ b/src/chunksilo/search.py @@ -43,6 +43,7 @@ pass from .cfgload import load_config +from .models import _get_cached_model_path, resolve_flashrank_model_name, configure_offline_mode logger = logging.getLogger(__name__) @@ -92,19 +93,6 @@ def _get_config() -> dict[str, Any]: }) -def _setup_offline_mode(config: dict[str, Any]) -> None: - """Configure offline mode for HuggingFace libraries if enabled.""" - offline_mode = config["retrieval"]["offline"] - if offline_mode: - os.environ["HF_HUB_OFFLINE"] = "1" - os.environ["TRANSFORMERS_OFFLINE"] = "1" - os.environ["HF_DATASETS_OFFLINE"] = "1" - cache_dir_abs = Path(config["storage"]["model_cache_dir"]).resolve() - os.environ["HF_HOME"] = str(cache_dir_abs) - os.environ["HF_HUB_CACHE"] = str(cache_dir_abs) - os.environ["HF_DATASETS_CACHE"] = str(cache_dir_abs) - - def _setup_ssl(config: dict[str, Any]) -> str | None: """Configure SSL/TLS CA bundle if specified. Returns the CA bundle path or None.""" ca_bundle_path = config["ssl"]["ca_bundle_path"] or None @@ -196,28 +184,6 @@ def _char_offset_to_line(char_offset: int | None, line_offsets: list[int] | None return left + 1 -def _get_cached_model_path(cache_dir: Path, model_name: str) -> Path | None: - """Get the cached model directory path using huggingface_hub's snapshot_download.""" - try: - from huggingface_hub import snapshot_download - from fastembed import TextEmbedding - models = TextEmbedding.list_supported_models() - model_info = [m for m in models if m.get("model") == model_name] - if model_info: - hf_source = model_info[0].get("sources", {}).get("hf") - if hf_source: - cache_dir_abs = cache_dir.resolve() - model_dir = snapshot_download( - repo_id=hf_source, - local_files_only=True, - cache_dir=str(cache_dir_abs) - ) - return Path(model_dir).resolve() - except (ImportError, Exception): - pass - return None - - def _ensure_embed_model(config: dict[str, Any]) -> None: """Ensure the embedding model is initialized.""" global _embed_model_initialized @@ -261,22 +227,11 @@ def _ensure_reranker(config: dict[str, Any]): "flashrank is required for reranking. Install with: pip install chunksilo" ) from exc - model_name = config["retrieval"]["rerank_model_name"] + raw_model_name = config["retrieval"]["rerank_model_name"] cache_dir = Path(config["storage"]["model_cache_dir"]) offline_mode = config["retrieval"]["offline"] - model_mapping = { - "cross-encoder/ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", - "ms-marco-MiniLM-L-6-v2": "ms-marco-MiniLM-L-12-v2", - } - if model_name in model_mapping: - model_name = model_mapping[model_name] - elif model_name.startswith("cross-encoder/"): - base_name = model_name.replace("cross-encoder/", "") - if "L-6" in base_name: - model_name = base_name.replace("L-6", "L-12") - else: - model_name = base_name + model_name = resolve_flashrank_model_name(raw_model_name) try: _reranker_model = Ranker(model_name=model_name, cache_dir=str(cache_dir)) @@ -1152,7 +1107,10 @@ def run_search( config = _init_config(config_path) if config_path else _get_config() # Setup environment on first call - _setup_offline_mode(config) + configure_offline_mode( + config["retrieval"]["offline"], + Path(config["storage"]["model_cache_dir"]), + ) _setup_ssl(config) start_time = time.time() diff --git a/src/chunksilo/ui.py b/src/chunksilo/ui.py new file mode 100644 index 0000000..95ddacd --- /dev/null +++ b/src/chunksilo/ui.py @@ -0,0 +1,639 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Terminal UI classes for the indexing pipeline. + +Contains IndexingUI (progress bars, spinners, output suppression), +FileProcessingContext (per-file timeout and heartbeat), and GracefulAbort +(two-stage Ctrl-C handling). +""" +import logging +import signal +import sys +import threading +import time +from pathlib import Path + +from . import cfgload + +logger = logging.getLogger(__name__) + + +class _DevNull: + """Minimal file-like sink that discards all writes.""" + + def write(self, _data: str) -> int: + return 0 + + def flush(self) -> None: + pass + + def isatty(self) -> bool: + return False + + +class IndexingUI: + """Unified terminal output for the indexing pipeline. + + Owns all stdout writes during build_index(). Provides two display modes: + - Step mode: "message... done" with animated spinner + - Progress mode: progress bar with optional sub-line for current file + + All methods are thread-safe via a single lock. + """ + + _SPINNER_CHARS = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + + def __init__(self, stream=None, verbose=False): + self._stream = stream or sys.stdout + self._verbose = verbose + self._lock = threading.Lock() + + # ANSI color codes (disabled when stream is not a TTY) + _s = self._stream + self._tty = hasattr(_s, "isatty") and _s.isatty() + self.RESET = "\033[0m" if self._tty else "" + self.BOLD = "\033[1m" if self._tty else "" + self.DIM = "\033[2m" if self._tty else "" + self.GREEN = "\033[32m" if self._tty else "" + self.YELLOW = "\033[33m" if self._tty else "" + self.CYAN = "\033[36m" if self._tty else "" + self.RED = "\033[31m" if self._tty else "" + self.BOLD_GREEN = "\033[1;32m" if self._tty else "" + self.BOLD_CYAN = "\033[1;36m" if self._tty else "" + + # Step/spinner state + self._step_message: str | None = None + self._step_original_message: str | None = None + self._step_stop = threading.Event() + self._step_thread: threading.Thread | None = None + + # Progress bar state + self._progress_active = False + self._progress_paused = False + self._progress_total = 0 + self._progress_current = 0 + self._progress_desc = "" + self._progress_unit = "file" + self._progress_width = 30 + self._progress_file = "" + self._progress_phase = "" + self._progress_heartbeat = "" + self._progress_has_subline = False + self._progress_last_pct = -1 # last printed percentage (non-TTY) + self._progress_substep = "" + self._progress_substep_stop = threading.Event() + self._progress_substep_thread: threading.Thread | None = None + self._progress_substep_idx = 0 + + # Output suppression state (populated by _suppress_output) + self._orig_stdout = None + self._orig_stderr = None + self._orig_handler_levels: list[tuple[logging.Handler, int]] = [] + + # -- context manager -- + + def __enter__(self): + self._suppress_output() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._step_message is not None: + self.step_done("interrupted") + self._step_original_message = None + if self._progress_substep: + self.progress_substep_done() + if self._progress_active or self._progress_paused: + self._progress_active = True + self._progress_paused = False + self.progress_done() + self._restore_output() + return False + + # -- step mode -- + + def step_start(self, message: str) -> None: + """Begin an animated step: prints 'message... ⠋' with spinner.""" + with self._lock: + self._step_message = message + self._step_original_message = message + self._step_stop.clear() + if not self._tty: + self._write(f"{message}... ") + return + self._write(f"\r{self.BOLD}{message}{self.RESET}... ") + self._step_thread = threading.Thread(target=self._step_spin, daemon=True) + self._step_thread.start() + + def step_update(self, message: str) -> None: + """Update the step message while the spinner continues.""" + with self._lock: + if self._step_message is not None: + self._step_message = message + + def step_done(self, suffix: str = "done") -> None: + """Complete the current step: replaces spinner with suffix. + + Uses the original step_start message for the final line so that + dynamic updates (e.g. file counts) don't appear in the completed line. + """ + self._step_stop.set() + if self._step_thread: + self._step_thread.join() + self._step_thread = None + with self._lock: + msg = self._step_original_message or self._step_message or "" + self._step_message = None + self._step_original_message = None + if not self._tty: + self._write(f"{suffix}\n") + return + colored_suffix = self._color_suffix(suffix) + self._write(f"\r{self.BOLD}{msg}{self.RESET}... {colored_suffix}\033[K\n") + + def _step_spin(self) -> None: + idx = 0 + while not self._step_stop.is_set(): + with self._lock: + if self._step_message is not None: + self._write( + f"\r{self.BOLD}{self._step_message}{self.RESET}... " + f"{self.CYAN}{self._SPINNER_CHARS[idx]}{self.RESET}\033[K" + ) + idx = (idx + 1) % len(self._SPINNER_CHARS) + self._step_stop.wait(0.1) + + # -- progress mode -- + + def progress_start(self, total: int, desc: str = "Processing files", unit: str = "file") -> None: + """Enter progress bar mode.""" + with self._lock: + self._progress_active = True + self._progress_paused = False + self._progress_total = max(total, 0) + self._progress_current = 0 + self._progress_desc = desc + self._progress_unit = unit + self._progress_file = "" + self._progress_phase = "" + self._progress_heartbeat = "" + self._progress_has_subline = False + self._progress_last_pct = -1 + if self._progress_total > 0: + self._render_progress() + + def progress_update(self, step: int = 1) -> None: + """Advance the progress bar.""" + with self._lock: + if not self._progress_active or self._progress_total <= 0: + return + self._progress_current = min(self._progress_total, self._progress_current + step) + self._render_progress() + + def progress_set_file(self, file_path: str, phase: str = "") -> None: + """Set current file shown on sub-line under the bar.""" + with self._lock: + if not self._progress_active: + return + self._progress_file = file_path + self._progress_phase = phase + self._render_progress() + + def progress_set_heartbeat(self, char: str) -> None: + """Update heartbeat animation character.""" + with self._lock: + if not self._progress_active: + return + self._progress_heartbeat = char + self._render_progress() + + def progress_pause(self) -> None: + """Temporarily hide the progress bar for step output.""" + with self._lock: + if not self._progress_active: + return + self._clear_progress_area() + self._progress_active = False + self._progress_paused = True + + def progress_resume(self) -> None: + """Re-show the progress bar after a pause.""" + with self._lock: + if not self._progress_paused: + return + self._progress_active = True + self._progress_paused = False + self._progress_has_subline = False + self._render_progress() + + def progress_substep_start(self, message: str) -> None: + """Show a substep message on the progress bar sub-line with spinner.""" + with self._lock: + self._progress_substep = message + self._progress_file = "" + self._progress_phase = "" + self._progress_heartbeat = "" + self._progress_substep_stop.clear() + self._progress_substep_idx = 0 + if not self._tty: + # Print current progress context + substep on one line + pct_int = 0 + if self._progress_total > 0: + pct_int = int(self._progress_current / self._progress_total * 100) + self._stream.write( + f"{self._progress_desc} " + f"[{self._progress_current}/{self._progress_total}] " + f"{pct_int}% \u2014 {message}... " + ) + self._stream.flush() + return + self._render_progress() + self._progress_substep_thread = threading.Thread( + target=self._substep_spin, daemon=True + ) + self._progress_substep_thread.start() + + def progress_substep_done(self) -> None: + """Clear the substep message from the progress bar sub-line.""" + self._progress_substep_stop.set() + if self._progress_substep_thread: + self._progress_substep_thread.join() + self._progress_substep_thread = None + with self._lock: + self._progress_substep = "" + self._progress_heartbeat = "" + if not self._tty: + self._stream.write("done\n") + self._stream.flush() + return + self._render_progress() + + def _substep_spin(self) -> None: + """Animate spinner on the progress sub-line for substep.""" + while not self._progress_substep_stop.is_set(): + with self._lock: + if self._progress_substep: + self._progress_heartbeat = self._SPINNER_CHARS[self._progress_substep_idx] + self._render_progress() + self._progress_substep_idx = (self._progress_substep_idx + 1) % len(self._SPINNER_CHARS) + self._progress_substep_stop.wait(0.1) + + def progress_done(self) -> None: + """Exit progress bar mode.""" + with self._lock: + if not self._progress_active: + return + if not self._tty: + # Print final 100% line if not already printed + if self._progress_last_pct < 100 and self._progress_total > 0: + self._write( + f"{self._progress_desc} " + f"[{self._progress_total}/{self._progress_total}] 100%\n" + ) + else: + # Render final state + self._render_progress() + # Move past the progress area + if self._progress_has_subline: + self._write("\n\n") + else: + self._write("\n") + self._progress_active = False + self._progress_paused = False + self._progress_has_subline = False + + def _render_progress(self) -> None: + """Render progress bar + optional file sub-line. Must hold lock.""" + if self._progress_total <= 0: + return + progress = self._progress_current / self._progress_total + + if not self._tty: + # Non-TTY: print a simple line every 10% to avoid log spam + pct_int = int(progress * 100) + threshold = (pct_int // 10) * 10 + if threshold <= self._progress_last_pct: + return + self._progress_last_pct = threshold + self._stream.write( + f"{self._progress_desc} " + f"[{self._progress_current}/{self._progress_total}] " + f"{pct_int}%\n" + ) + self._stream.flush() + return + + filled = int(self._progress_width * progress) + bar_filled = f"{self.GREEN}{'█' * filled}{self.RESET}" + bar_empty = f"{self.DIM}{'░' * (self._progress_width - filled)}{self.RESET}" + bar = f"{bar_filled}{bar_empty}" + + # Move cursor to start of progress area + if self._progress_has_subline: + self._stream.write("\033[1A\r") + + # Line 1: progress bar + pct = f"{progress * 100:5.1f}%" + if progress >= 1.0: + pct = f"{self.BOLD_GREEN}{pct}{self.RESET}" + line1 = ( + f"{self.BOLD}{self._progress_desc}{self.RESET} [{bar}] " + f"{pct} {self.DIM}({self._progress_current}/{self._progress_total}){self.RESET}" + ) + self._stream.write(f"\r\033[K{line1}") + + # Line 2: substep message (priority) or current file + if self._progress_substep: + subline = f" {self.DIM}{self._progress_substep}{self.RESET}" + if self._progress_heartbeat: + subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}" + self._stream.write(f"\n\033[K{subline}") + self._progress_has_subline = True + elif self._progress_file: + file_display = Path(self._progress_file).name + if len(file_display) > 50: + file_display = "..." + file_display[-47:] + subline = f" {self.DIM}{file_display}{self.RESET}" + if self._progress_phase: + subline += f" {self.DIM}({self._progress_phase}" + if self._progress_heartbeat: + subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}{self.DIM}" + subline += f"){self.RESET}" + elif self._progress_heartbeat: + subline += f" {self.CYAN}{self._progress_heartbeat}{self.RESET}" + self._stream.write(f"\n\033[K{subline}") + self._progress_has_subline = True + elif self._progress_has_subline: + # Clear stale sub-line + self._stream.write("\n\033[K") + + self._stream.flush() + + def _clear_progress_area(self) -> None: + """Clear progress bar lines from terminal. Must hold lock.""" + if not self._tty: + return + if self._progress_has_subline: + self._stream.write("\033[1A\r\033[K\n\033[K\033[1A\r") + else: + self._stream.write("\r\033[K") + self._stream.flush() + self._progress_has_subline = False + + # -- general output -- + + def print(self, message: str) -> None: + """Print a plain text line.""" + with self._lock: + self._write(f"{message}\n") + + def success(self, message: str) -> None: + """Print a success message in bold green.""" + with self._lock: + self._write(f"{self.BOLD_GREEN}{message}{self.RESET}\n") + + def error(self, message: str) -> None: + """Print an error message in red.""" + with self._lock: + self._write(f"{self.RED}{message}{self.RESET}\n") + + # -- internal helpers -- + + def _color_suffix(self, suffix: str) -> str: + """Return a color-coded suffix string for step_done output.""" + s = suffix.lower() + if s in ("done", "no changes"): + return f"{self.GREEN}{suffix}{self.RESET}" + if s in ("skipped",): + return f"{self.YELLOW}{suffix}{self.RESET}" + if s in ("interrupted",): + return f"{self.RED}{suffix}{self.RESET}" + return suffix + + def _write(self, text: str) -> None: + """Write to stream and flush. Caller must hold lock.""" + self._stream.write(text) + self._stream.flush() + + def _suppress_output(self) -> None: + """Redirect stdout/stderr and silence root logger stream handlers. + + IndexingUI captures self._stream at __init__ time, so it keeps + writing to the real terminal. All 3rd-party code that calls + print() or writes to sys.stdout/stderr hits _DevNull instead. + + Skipped when verbose=True to allow full debugging output. + """ + if self._verbose: + return + + devnull = _DevNull() + + self._orig_stdout = sys.stdout + self._orig_stderr = sys.stderr + sys.stdout = devnull + sys.stderr = devnull + + for handler in logging.root.handlers: + if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): + self._orig_handler_levels.append((handler, handler.level)) + handler.setLevel(logging.CRITICAL + 1) + + def _restore_output(self) -> None: + """Restore stdout/stderr and root logger handler levels.""" + if self._orig_stdout is not None: + sys.stdout = self._orig_stdout + self._orig_stdout = None + if self._orig_stderr is not None: + sys.stderr = self._orig_stderr + self._orig_stderr = None + + for handler, level in self._orig_handler_levels: + handler.setLevel(level) + self._orig_handler_levels.clear() + + +class FileProcessingTimeoutError(Exception): + """Raised when file processing exceeds timeout.""" + pass + + +class FileProcessingContext: + """Context manager for file processing with timeout and heartbeat. + + Usage: + with FileProcessingContext(file_path, ui, timeout=300) as ctx: + ctx.set_phase("Converting .doc") + result = process_file() + """ + + def __init__( + self, + file_path: str, + ui: IndexingUI, + timeout_seconds: float | None = None, + heartbeat_interval: float = 2.0 + ): + self.file_path = file_path + self.ui = ui + self.timeout_seconds = timeout_seconds + self.heartbeat_interval = heartbeat_interval + + self._start_time: float | None = None + self._stop_event = threading.Event() + self._timeout_event = threading.Event() + self._heartbeat_thread: threading.Thread | None = None + self._current_phase = "" + + def __enter__(self): + """Start timing and heartbeat thread.""" + self._start_time = time.time() + + # Start heartbeat thread + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_loop, + daemon=True + ) + self._heartbeat_thread.start() + + # Update UI with current file + self.ui.progress_set_file(self.file_path, "") + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Stop heartbeat and check for slow files.""" + # Stop heartbeat thread + self._stop_event.set() + if self._heartbeat_thread: + self._heartbeat_thread.join(timeout=1.0) + + # Warn about slow files (warnings still pass through) + if self._start_time: + duration = time.time() - self._start_time + slow_threshold = cfgload.get( + "indexing.logging.slow_file_threshold_seconds", 30 + ) + if duration > slow_threshold: + logger.warning( + f"Slow file processing: {self.file_path} took {duration:.1f}s" + ) + + # Don't suppress exceptions + return False + + def set_phase(self, phase: str) -> None: + """Update current operation phase.""" + self._current_phase = phase + self.ui.progress_set_file(self.file_path, phase) + + # Check for timeout + self.check_timeout() + + def check_timeout(self) -> None: + """Check if processing has exceeded timeout (main-thread safe). + + Checks both the _timeout_event (set by heartbeat thread) and + elapsed wall-clock time. Raises FileProcessingTimeoutError if + either indicates a timeout. + """ + if self.timeout_seconds is None or self._start_time is None: + return + + if self._timeout_event.is_set(): + elapsed = time.time() - self._start_time + raise FileProcessingTimeoutError( + f"File processing timed out after {elapsed:.1f}s: {self.file_path}" + ) + + elapsed = time.time() - self._start_time + if elapsed > self.timeout_seconds: + self._timeout_event.set() + raise FileProcessingTimeoutError( + f"File processing timed out after {elapsed:.1f}s: {self.file_path}" + ) + + def remaining_seconds(self) -> float | None: + """Return seconds remaining before timeout, or None if no timeout set.""" + if self.timeout_seconds is None or self._start_time is None: + return None + remaining = self.timeout_seconds - (time.time() - self._start_time) + return max(0.0, remaining) + + def _check_timeout(self) -> None: + """Check if processing has exceeded timeout (used by heartbeat thread).""" + if self.timeout_seconds is None or self._start_time is None: + return + + elapsed = time.time() - self._start_time + if elapsed > self.timeout_seconds: + self._timeout_event.set() + raise FileProcessingTimeoutError( + f"File processing timed out after {elapsed:.1f}s: {self.file_path}" + ) + + def _heartbeat_loop(self) -> None: + """Background thread that updates heartbeat indicator.""" + if not self.ui._tty: + # Non-TTY: only monitor for timeouts, skip animation + while not self._stop_event.is_set(): + try: + self._check_timeout() + except FileProcessingTimeoutError: + break + time.sleep(self.heartbeat_interval) + return + + spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + idx = 0 + + while not self._stop_event.is_set(): + self.ui.progress_set_heartbeat(spinner_chars[idx]) + idx = (idx + 1) % len(spinner_chars) + + # Check for timeout + try: + self._check_timeout() + except FileProcessingTimeoutError: + break + + time.sleep(self.heartbeat_interval) + + +class GracefulAbort: + """Two-stage Ctrl-C handler for the indexing pipeline. + + First Ctrl-C: sets abort flag and restores default SIGINT so a second + Ctrl-C raises KeyboardInterrupt immediately. + """ + + def __init__(self, ui: "IndexingUI"): + self._abort = False + self._ui = ui + self._original_handler = None + + @property + def abort_requested(self) -> bool: + return self._abort + + def install(self) -> None: + """Register the custom SIGINT handler. Must be called from the main thread.""" + self._original_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self._handle_sigint) + + def uninstall(self) -> None: + """Restore the original SIGINT handler.""" + if self._original_handler is not None: + signal.signal(signal.SIGINT, self._original_handler) + self._original_handler = None + + def _handle_sigint(self, signum, frame): + self._abort = True + signal.signal(signal.SIGINT, signal.SIG_DFL) + self._ui.print( + f"\n{self._ui.YELLOW}Ctrl-C received. " + f"Finishing current batch and saving state...{self._ui.RESET}" + ) + self._ui.print( + f"{self._ui.DIM}(Press Ctrl-C again to force quit immediately)" + f"{self._ui.RESET}" + ) diff --git a/test/test_indexing_ui.py b/test/test_indexing_ui.py index d3b8a6b..9e181ff 100644 --- a/test/test_indexing_ui.py +++ b/test/test_indexing_ui.py @@ -25,7 +25,7 @@ def isatty(self): def _make_ui(stream=None): """Create an IndexingUI with a local import (avoids module-level config init).""" - from chunksilo.index import IndexingUI + from chunksilo.ui import IndexingUI return IndexingUI(stream=stream or _FakeTTY()) @@ -285,7 +285,7 @@ def test_restores_streams_on_exception(self): def test_verbose_skips_suppression(self): """verbose=True leaves stdout/stderr untouched.""" - from chunksilo.index import IndexingUI + from chunksilo.ui import IndexingUI stream = io.StringIO() ui = IndexingUI(stream=stream, verbose=True) orig_stdout = sys.stdout @@ -412,7 +412,7 @@ def test_error_outputs_message(self, ui): def _make_non_tty_ui(): """Create an IndexingUI with a plain StringIO (non-TTY) stream.""" - from chunksilo.index import IndexingUI + from chunksilo.ui import IndexingUI return IndexingUI(stream=io.StringIO()) @@ -717,7 +717,7 @@ def test_wall_clock_safety_net(self): class TestFileProcessingContextTimeout: def test_check_timeout_raises_on_expired(self): """check_timeout raises FileProcessingTimeoutError when time is up.""" - from chunksilo.index import FileProcessingContext, FileProcessingTimeoutError + from chunksilo.ui import FileProcessingContext, FileProcessingTimeoutError ui = _make_ui() ui.progress_start(1) @@ -730,7 +730,7 @@ def test_check_timeout_raises_on_expired(self): def test_check_timeout_raises_when_event_set(self): """check_timeout raises when _timeout_event is already set.""" - from chunksilo.index import FileProcessingContext, FileProcessingTimeoutError + from chunksilo.ui import FileProcessingContext, FileProcessingTimeoutError ui = _make_ui() ui.progress_start(1) @@ -744,7 +744,7 @@ def test_check_timeout_raises_when_event_set(self): def test_remaining_seconds_returns_positive(self): """remaining_seconds returns positive value when time remains.""" - from chunksilo.index import FileProcessingContext + from chunksilo.ui import FileProcessingContext ui = _make_ui() ui.progress_start(1) @@ -757,7 +757,7 @@ def test_remaining_seconds_returns_positive(self): def test_remaining_seconds_returns_none_without_timeout(self): """remaining_seconds returns None when no timeout is configured.""" - from chunksilo.index import FileProcessingContext + from chunksilo.ui import FileProcessingContext ui = _make_ui() ui.progress_start(1) @@ -768,7 +768,7 @@ def test_remaining_seconds_returns_none_without_timeout(self): def test_remaining_seconds_clamps_to_zero(self): """remaining_seconds returns 0 when timeout has passed.""" - from chunksilo.index import FileProcessingContext + from chunksilo.ui import FileProcessingContext ui = _make_ui() ui.progress_start(1) From ec199aefd0f4746ee1a752ddea54871135292636 Mon Sep 17 00:00:00 2001 From: Fredrik Reveny Date: Thu, 26 Feb 2026 06:33:03 +0100 Subject: [PATCH 2/3] fix: break circular import between index.py and docx_utils.py Move the import of EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS, and get_heading_store to inside split_docx_into_heading_documents() to avoid the index -> docx_utils -> index circular import at module load time. Co-Authored-By: Claude Opus 4.6 --- src/chunksilo/docx_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/chunksilo/docx_utils.py b/src/chunksilo/docx_utils.py index 7e1892c..241bb0d 100644 --- a/src/chunksilo/docx_utils.py +++ b/src/chunksilo/docx_utils.py @@ -16,7 +16,6 @@ from llama_index.core import Document as LlamaIndexDocument from . import cfgload -from .index import EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS, get_heading_store if TYPE_CHECKING: from .ui import FileProcessingContext @@ -117,6 +116,9 @@ def split_docx_into_heading_documents( docx_path: Path to DOCX file ctx: Optional processing context for progress updates and timeout """ + # Lazy import to avoid circular dependency (index.py imports docx_utils) + from .index import EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS, get_heading_store + docs: list[LlamaIndexDocument] = [] if ctx: From 62dfec32e68ee2e6adea65bf9a64dd9fc5610063 Mon Sep 17 00:00:00 2001 From: Fredrik Reveny Date: Thu, 26 Feb 2026 06:39:18 +0100 Subject: [PATCH 3/3] fix: replace lazy import hack with dependency injection in docx_utils.py Pass heading_store, excluded_embed_metadata_keys, and excluded_llm_metadata_keys as parameters to split_docx_into_heading_documents() instead of importing them from index.py. This cleanly breaks the circular dependency without any runtime import tricks. Co-Authored-By: Claude Opus 4.6 --- src/chunksilo/docx_utils.py | 30 +++++++++++++++++++----------- src/chunksilo/index.py | 28 ++++++++++++++++++++++++---- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/src/chunksilo/docx_utils.py b/src/chunksilo/docx_utils.py index 241bb0d..c119e76 100644 --- a/src/chunksilo/docx_utils.py +++ b/src/chunksilo/docx_utils.py @@ -10,7 +10,7 @@ import logging from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from docx import Document from llama_index.core import Document as LlamaIndexDocument @@ -108,16 +108,23 @@ def _convert_doc_to_docx(doc_path: Path, timeout: float = 60) -> Path | None: def split_docx_into_heading_documents( docx_path: Path, - ctx: FileProcessingContext | None = None + ctx: FileProcessingContext | None = None, + *, + heading_store: Any = None, + excluded_embed_metadata_keys: list[str] | None = None, + excluded_llm_metadata_keys: list[str] | None = None, ) -> list[LlamaIndexDocument]: """Split DOCX into documents by heading with progress updates. Args: docx_path: Path to DOCX file ctx: Optional processing context for progress updates and timeout + heading_store: HeadingStore instance for persisting heading metadata + excluded_embed_metadata_keys: Keys to exclude from embedding text + excluded_llm_metadata_keys: Keys to exclude from LLM context """ - # Lazy import to avoid circular dependency (index.py imports docx_utils) - from .index import EXCLUDED_EMBED_METADATA_KEYS, EXCLUDED_LLM_METADATA_KEYS, get_heading_store + _excluded_embed = excluded_embed_metadata_keys or [] + _excluded_llm = excluded_llm_metadata_keys or [] docs: list[LlamaIndexDocument] = [] @@ -174,9 +181,10 @@ def split_docx_into_heading_documents( char_position += len(para.text) + 1 # +1 for newline # Store headings separately to avoid metadata size issues during chunking - if ctx: - ctx.set_phase("Storing heading metadata") - get_heading_store().set_headings(str(docx_path), all_headings) + if heading_store is not None: + if ctx: + ctx.set_phase("Storing heading metadata") + heading_store.set_headings(str(docx_path), all_headings) # Second pass: Split by heading (existing logic) if ctx: @@ -225,8 +233,8 @@ def flush_current(): docs.append(LlamaIndexDocument( text=text, metadata=metadata, - excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, - excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + excluded_embed_metadata_keys=_excluded_embed, + excluded_llm_metadata_keys=_excluded_llm, )) for para in doc.paragraphs: @@ -267,8 +275,8 @@ def flush_current(): docs.append(LlamaIndexDocument( text=full_text, metadata=metadata, - excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, - excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + excluded_embed_metadata_keys=_excluded_embed, + excluded_llm_metadata_keys=_excluded_llm, )) logger.info( diff --git a/src/chunksilo/index.py b/src/chunksilo/index.py index 7f6a429..7bd410e 100644 --- a/src/chunksilo/index.py +++ b/src/chunksilo/index.py @@ -855,7 +855,12 @@ def load_file( remaining = ctx.remaining_seconds() if ctx else None if remaining is not None: result = _run_with_timeout( - lambda: split_docx_into_heading_documents(file_path, ctx), + lambda: split_docx_into_heading_documents( + file_path, ctx, + heading_store=get_heading_store(), + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + ), timeout_seconds=remaining, default=None, ) @@ -865,7 +870,12 @@ def load_file( ) return [] return result - return split_docx_into_heading_documents(file_path, ctx) + return split_docx_into_heading_documents( + file_path, ctx, + heading_store=get_heading_store(), + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + ) elif file_path.suffix.lower() == ".doc": # Convert .doc to .docx using LibreOffice, then process if ctx: @@ -884,7 +894,12 @@ def load_file( remaining = ctx.remaining_seconds() if ctx else None if remaining is not None: result = _run_with_timeout( - lambda: split_docx_into_heading_documents(docx_path, ctx), + lambda: split_docx_into_heading_documents( + docx_path, ctx, + heading_store=get_heading_store(), + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + ), timeout_seconds=remaining, default=None, ) @@ -894,7 +909,12 @@ def load_file( ) docs = result if result is not None else [] else: - docs = split_docx_into_heading_documents(docx_path, ctx) + docs = split_docx_into_heading_documents( + docx_path, ctx, + heading_store=get_heading_store(), + excluded_embed_metadata_keys=EXCLUDED_EMBED_METADATA_KEYS, + excluded_llm_metadata_keys=EXCLUDED_LLM_METADATA_KEYS, + ) # Update metadata to point to original .doc file for doc in docs: doc.metadata["file_path"] = str(file_path)