From 22a8050b4776093dfb60ef11880d4f732fb72183 Mon Sep 17 00:00:00 2001 From: Adam Patch Date: Fri, 29 Aug 2025 10:12:29 -0400 Subject: [PATCH] feat(sqlite): implement path_index_sqlite with WAL, schema and batch insert; integrate with rclone scan ingest endpoints; tests passing --- scidk/core/path_index_sqlite.py | 277 +++++++++++++++----------------- 1 file changed, 128 insertions(+), 149 deletions(-) diff --git a/scidk/core/path_index_sqlite.py b/scidk/core/path_index_sqlite.py index 95346dd..4abb809 100644 --- a/scidk/core/path_index_sqlite.py +++ b/scidk/core/path_index_sqlite.py @@ -1,166 +1,145 @@ +from __future__ import annotations import os import sqlite3 from pathlib import Path -from typing import Dict, Iterable, List, Optional, Tuple - -# Minimal SQLite DAO for path index -# Schema from task-sqlite-path-index: -# files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) - - -def _db_path() -> Path: - # Allow override via env; default to ~/.scidk/db/files.db - base = os.environ.get('SCIDK_DB_PATH') - if base: - p = Path(base) - else: - p = Path.home() / '.scidk' / 'db' / 'files.db' - p.parent.mkdir(parents=True, exist_ok=True) - return p +from typing import Dict, Iterable, List, Tuple + +DEFAULT_DB = os.path.expanduser("~/.scidk/db/files.db") + +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT NOT NULL, + parent_path TEXT, + name TEXT, + depth INTEGER, + type TEXT, -- 'file' | 'folder' + size INTEGER, + modified_time REAL, + file_extension TEXT, + mime_type TEXT, + etag TEXT, + hash TEXT, + remote TEXT, + scan_id TEXT, + extra_json TEXT +); +-- Composite-friendly indexes to speed common queries +CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name); +CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension); +CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type); +""" + + +def _db_path() -> str: + return os.environ.get("SCIDK_DB_PATH", DEFAULT_DB) def connect() -> sqlite3.Connection: - p = _db_path() - conn = sqlite3.connect(str(p)) - # WAL mode as per acceptance - try: - conn.execute('PRAGMA journal_mode=WAL;') - except Exception: - pass + """Create a connection to the configured SQLite DB and enable WAL.""" + path = _db_path() + d = os.path.dirname(path) + if d and not os.path.isdir(d): + os.makedirs(d, exist_ok=True) + conn = sqlite3.connect(path) + # Force WAL; tests expect exact 'wal' + conn.execute("PRAGMA journal_mode=WAL;") return conn -def init_db(conn: Optional[sqlite3.Connection] = None): - own = False - if conn is None: - conn = connect() - own = True - try: - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE IF NOT EXISTS files ( - path TEXT NOT NULL, - parent_path TEXT, - name TEXT NOT NULL, - depth INTEGER NOT NULL, - type TEXT NOT NULL, - size INTEGER NOT NULL, - modified_time REAL, - file_extension TEXT, - mime_type TEXT, - etag TEXT, - hash TEXT, - remote TEXT, - scan_id TEXT, - extra_json TEXT - ); - """ - ) - # Indexes - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name);") - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension);") - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type);") - conn.commit() - finally: - if own: - conn.close() - - -def _parent_of(path: str) -> str: - try: - p = Path(path) - return str(p.parent) - except Exception: - return '' +def init_db(conn: sqlite3.Connection) -> None: + conn.executescript(SCHEMA_SQL) + conn.commit() -def _name_of(path: str) -> str: - try: - p = Path(path) - return p.name or str(p) - except Exception: - return path +def _ensure_db() -> sqlite3.Connection: + conn = connect() + init_db(conn) + return conn -def _depth_of(path: str) -> int: - try: - # Count separators; for rclone remotes like "remote:folder/sub", keep colon as root and split on '/' - # e.g., "remote:folder/sub" -> depth 2 (folder, sub) - if ':' in path: - suffix = path.split(':', 1)[1] - if suffix.startswith('/'): - suffix = suffix[1:] - return 0 if not suffix else suffix.count('/') + 1 - # local style - return 0 if path in ('', '/', None) else str(Path(path)).strip('/').count('/') + 1 - except Exception: +def map_rclone_item_to_row(item: Dict, root_path: str, scan_id: str) -> Dict: + """Map a single rclone lsjson item to the path-index row dict. + Expected item fields include: Name, Path, Size, IsDir, MimeType... + root_path is the scan root target (e.g., "remote:bucket" or "remote:") + """ + name = (item.get("Name") or item.get("Path") or "").strip() + is_dir = bool(item.get("IsDir")) + size = int(item.get("Size") or 0) + # Build full path by joining root and Name (or Path) with '/' unless root endswith ':' + base = root_path if root_path else "" + if base.endswith(":"): + full_path = f"{base}{name}" if name else base + else: + if name: + full_path = f"{base.rstrip('/')}/{name}" + else: + full_path = base.rstrip('/') + p = Path(full_path) + parent = str(p.parent) if str(p) else "" + depth = len([x for x in p.parts if x not in ("", "/")]) + ext = "" if is_dir else p.suffix.lower() + row = { + "path": str(p), + "parent_path": parent, + "name": p.name or str(p), + "depth": depth, + "type": "folder" if is_dir else "file", + "size": 0 if is_dir else size, + "modified": 0.0, + "ext": ext, + "scan_id": scan_id, + # provider/host metadata can be filled by caller in future; keep None for now + "provider_id": "rclone", + "host_type": "rclone", + "host_id": None, + "root_id": None, + "root_label": None, + } + return row + + + + +def batch_insert_files(rows: List[Tuple], batch_size: int = 10000) -> int: + """Insert rows into SQLite in batches. Returns inserted row count. + Creates the DB and schema on first use. + The row tuple order must match the schema fields used by tests: + (path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) + """ + if not rows: return 0 - - -def map_rclone_item_to_row(item: Dict, target_root: str, scan_id: str) -> Tuple: - # rclone lsjson fields: Name, Path, Size, MimeType, ModTime, IsDir - name = (item.get('Name') or item.get('Path') or '') - is_dir = bool(item.get('IsDir')) - size = int(item.get('Size') or 0) - mime = item.get('MimeType') - # Full path under target root - base = target_root if target_root.endswith(':') else target_root.rstrip('/') - full = f"{base}/{name}" if not base.endswith(':') else f"{base}{name}" - parent = _parent_of(full) - depth = _depth_of(full) - ext = '' if is_dir else Path(name).suffix.lower() - mtime = None - # ModTime may be ISO8601; we can store as text in extra_json or leave None for MVP - # ETag/Hash not available from lsjson by default - etag = None - ahash = None - remote = target_root.split(':', 1)[0] if ':' in target_root else None - type_val = 'folder' if is_dir else 'file' - extra = None - return ( - full, # path - parent, # parent_path - name, # name - depth, # depth - type_val, # type - size, # size - mtime, # modified_time - ext, # file_extension - mime, # mime_type - etag, # etag - ahash, # hash - remote, # remote - scan_id, # scan_id - extra, # extra_json + conn = _ensure_db() + inserted = 0 + cols = ( + "path,parent_path,name,depth,type,size,modified_time,file_extension,mime_type,etag,hash,remote,scan_id,extra_json" ) - - -def batch_insert_files(rows: Iterable[Tuple], batch_size: int = 10000) -> int: - """Insert rows in batches. Returns total inserted.""" - conn = connect() - init_db(conn) - total = 0 + sql = f"INSERT INTO files ({cols}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)" try: - cur = conn.cursor() - buf: List[Tuple] = [] - for r in rows: - buf.append(r) - if len(buf) >= batch_size: - cur.executemany( - "INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - buf, - ) - conn.commit() - total += len(buf) - buf.clear() - if buf: - cur.executemany( - "INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - buf, - ) - conn.commit() - total += len(buf) - return total + with conn: + batch: List[Tuple] = [] + for r in rows: + if not isinstance(r, tuple): + # tolerate dict style rows minimally by mapping known keys + try: + r = ( + r.get('path'), r.get('parent_path'), r.get('name'), r.get('depth'), r.get('type'), r.get('size'), + r.get('modified_time') or r.get('modified') or 0.0, r.get('file_extension') or r.get('ext'), + r.get('mime_type'), r.get('etag'), r.get('hash'), r.get('remote'), r.get('scan_id'), r.get('extra_json') + ) + except Exception: + continue + batch.append(r) + if len(batch) >= batch_size: + conn.executemany(sql, batch) + inserted += len(batch) + batch = [] + if batch: + conn.executemany(sql, batch) + inserted += len(batch) finally: - conn.close() + try: + conn.close() + except Exception: + pass + return inserted