diff --git a/dev/cli.py b/dev/cli.py index 6b18d60..a9b72b9 100644 --- a/dev/cli.py +++ b/dev/cli.py @@ -16,8 +16,9 @@ def _is_yaml_kv_line(line: str) -> bool: # Accept key: value or key: (block start) ignoring leading spaces + # Empty lines are NOT considered part of the YAML header sentinel for our parser if not line.strip(): - return True + return False if line.lstrip().startswith('#'): return False return bool(re.match(r"^[A-Za-z0-9_\-]+:\s*.*$", line.strip())) @@ -217,9 +218,11 @@ def get_ready_queue(self) -> List[Dict]: data = self.parse_frontmatter(md) if not isinstance(data, dict) or not data: continue - status = str(data.get('status', '')).strip() + # normalize status + status_raw = str(data.get('status', '')).strip() + status = status_raw.lower() dor_val = str(data.get('dor', '')).strip().lower() in ('true', '1', 'yes', 'y') - if status == 'Ready' and dor_val: + if status == 'ready' and dor_val: data['file_path'] = md ready_tasks.append(data) # Sort by RICE descending @@ -336,6 +339,14 @@ def start_task(self, task_id: str): print("\n🧭 TASK CONTEXT\n" + "=" * 40) print(self.get_task_context(task_id)) + # Mark as In Progress with started_at + try: + now = datetime.utcnow().isoformat() + "Z" + self.update_task_frontmatter(task_id, {"status": "In Progress", "started_at": now}, git_commit=True) + print(f"\n✍️ Updated task {task_id} → status: In Progress") + except Exception as e: + print(f"⚠️ Could not update task status: {e}") + def complete_task(self, task_id: str): """Mark task as complete and validate DoD (lightweight)""" print(f"🎯 Completing task: {task_id}") @@ -387,6 +398,24 @@ def complete_task(self, task_id: str): print(" - [ ] tests\n - [ ] docs\n - [ ] demo_steps") print("\nℹ️ Next steps (manual): create PR, ensure demo steps are documented, merge when CI is green.") + # If tests passed, mark Done + try: + tests_ok = False + # rely on a quick rerun to assert tests are passing now + try: + res = subprocess.run([sys.executable, '-m', 'pytest', '-q']) + tests_ok = (res.returncode == 0) + except Exception: + tests_ok = False + if tests_ok: + now = datetime.utcnow().isoformat() + "Z" + self.update_task_frontmatter(task_id, {"status": "Done", "completed_at": now}, git_commit=True) + print(f"\n✅ Marked task {task_id} as Done.") + else: + print("\n⏸️ Skipping status update to Done because tests did not pass just now.") + except Exception as e: + print(f"⚠️ Could not update task status: {e}") + def cycle_status(self): """Show current cycle status (lightweight)""" if not self.cycles_file.exists(): @@ -425,6 +454,85 @@ def next_cycle(self): print(f" {acc_str[:80]}...") print() + # ------------------------ + # Frontmatter write helper + # ------------------------ + def update_task_frontmatter(self, task_id: str, updates: Dict, git_commit: bool = True) -> None: + """Safely merge and rewrite YAML frontmatter for a task file. + Always writes a fenced YAML frontmatter at top and preserves the rest of the file body. + """ + task_file = self.find_task_file(task_id) + if not task_file or not task_file.exists(): + raise FileNotFoundError(f"Task {task_id} not found") + + text = task_file.read_text(encoding="utf-8") + + def dump_yaml(d: Dict) -> str: + return yaml.safe_dump(d or {}, sort_keys=False).strip() + "\n" + + # Parse existing metadata using reader (robust to styles) + try: + cur = self.parse_frontmatter(task_file) or {} + except Exception: + cur = {} + if not isinstance(cur, dict): + cur = {} + cur.update(updates or {}) + + # Compute the remaining body content by stripping any existing frontmatter/header + body = text + if text.startswith('---'): + lines = text.splitlines() + end_idx = None + for i in range(1, len(lines)): + if lines[i].strip() == '---': + end_idx = i + break + body = "\n".join(lines[end_idx+1:]) if (end_idx is not None and end_idx + 1 < len(lines)) else "" + else: + # Try to detect unfenced header and cut it off + lines = text.splitlines() + header_lines = [] + body_start = 0 + in_header = True + for idx, line in enumerate(lines): + if line.startswith('```') or line.startswith('~~~'): + body_start = idx + break + if in_header and (_is_yaml_kv_line(line) or (header_lines and line.startswith((' ', '\t', '-')))): + header_lines.append(line) + continue + if header_lines: + body_start = idx + break + else: + in_header = False + body_start = 0 + break + if header_lines: + # If the whole file is YAML-like (no clear boundary), treat it as pure metadata + if body_start == 0 and len(header_lines) == len(lines): + body = "" + else: + body = "\n".join(lines[body_start:]) + else: + body = text + + new_fm = dump_yaml(cur) + new_text = f"---\n{new_fm}---\n" + body.lstrip('\n') + task_file.write_text(new_text, encoding="utf-8") + + if git_commit: + try: + res = subprocess.run("git rev-parse --is-inside-work-tree", shell=True, capture_output=True, text=True) + if res.returncode == 0 and (res.stdout or '').strip() == 'true': + rel = os.path.relpath(str(task_file), start=str(self.repo_root)) + subprocess.run(f"git add {sh_quote(rel)}", shell=True) + msg = f"chore(task): update frontmatter {task_id}" + subprocess.run(f"git commit -m {sh_quote(msg)}", shell=True) + except Exception: + pass + def sh_quote(s: str) -> str: return "'" + s.replace("'", "'\\''") + "'" diff --git a/dev/tasks/core-architecture/mvp/task-rclone-scan-ingest.md b/dev/tasks/core-architecture/mvp/task-rclone-scan-ingest.md index d9f8e18..c09a63b 100644 --- a/dev/tasks/core-architecture/mvp/task-rclone-scan-ingest.md +++ b/dev/tasks/core-architecture/mvp/task-rclone-scan-ingest.md @@ -1,3 +1,92 @@ +--- +id: task:core-architecture/mvp/rclone-scan-ingest +title: rclone lsjson scan and batch ingest into SQLite +status: Done +owner: agent +rice: 4.3 +estimate: 2d +created: 2025-08-28 +updated: 2025-08-28 +dor: true +dod: +- tests +- docs +- demo_steps +dependencies: +- task:core-architecture/mvp/sqlite-path-index +tags: +- rclone +- discovery +- ingest +story: story:core-architecture-reboot +phase: phase:core-architecture-reboot/02-scan-sqlite +links: + cycles: + - dev/cycles.md + plan: + - dev/plans/plan-2025-08-28-reboot-architecture.md + story: + - dev/stories/core-architecture-reboot/story.md + phase: + - dev/stories/core-architecture-reboot/phases/phase-02-scan-sqlite.md +acceptance: +- Wrapper for rclone lsjson with --recursive and optional --fast-list +- Batch insert records (10k/txn) mapped to path-index schema +- POST /api/scans and GET /api/scans/{id}/status implemented with progress counters +demo_steps: +- Export env to enable rclone provider: 'export SCIDK_PROVIDERS="local_fs,mounted_fs,rclone" + + ' +- Start Flask app (example): 'python -c "from scidk.app import create_app; app=create_app(); + app.run(port=5001)" + + ' +- Trigger a scan via HTTP: "curl -s -X POST http://localhost:5001/api/scans \\\n \ + \ -H 'Content-Type: application/json' \\\n -d '{\"provider_id\":\"rclone\",\"\ + root_id\":\"remote:\",\"path\":\"remote:bucket\",\"recursive\":false,\"fast_list\"\ + :true}'\n" +- Poll status: 'curl -s http://localhost:5001/api/scans//status | jq . + + ' +- Browse the scan snapshot (virtual root): 'curl -s ''http://localhost:5001/api/scans//fs'' + | jq . + + ' +docs: +- Rclone scanning uses `rclone lsjson`; when recursive=false, both folders and files + may be returned. +- Ingest persists rows into SQLite at SCIDK_DB_PATH (default: ~/.scidk/db/files.db) + in WAL mode. +- Status endpoint returns file_count (files only), folder_count (top-level when non-recursive), + and ingested_rows (files + folders inserted). +started_at: '2025-08-29T16:46:40.699741Z' +completed_at: '2025-08-29T16:47:03.910975Z' +--- + ' +- Start Flask app (example): 'python -c "from scidk.app import create_app; app=create_app(); + app.run(port=5001)" + + ' +- Trigger a scan via HTTP: "curl -s -X POST http://localhost:5001/api/scans \\\n \ + \ -H 'Content-Type: application/json' \\\n -d '{\"provider_id\":\"rclone\",\"\ + root_id\":\"remote:\",\"path\":\"remote:bucket\",\"recursive\":false,\"fast_list\"\ + :true}'\n" +- Poll status: 'curl -s http://localhost:5001/api/scans//status | jq . + + ' +- Browse the scan snapshot (virtual root): 'curl -s ''http://localhost:5001/api/scans//fs'' + | jq . + + ' +docs: +- Rclone scanning uses `rclone lsjson`; when recursive=false, both folders and files + may be returned. +- Ingest persists rows into SQLite at SCIDK_DB_PATH (default: ~/.scidk/db/files.db) + in WAL mode. +- Status endpoint returns file_count (files only), folder_count (top-level when non-recursive), + and ingested_rows (files + folders inserted). +started_at: '2025-08-29T16:44:35.566541Z' + id: task:core-architecture/mvp/rclone-scan-ingest title: rclone lsjson scan and batch ingest into SQLite status: Ready @@ -42,4 +131,4 @@ demo_steps: docs: - Rclone scanning uses `rclone lsjson`; when recursive=false, both folders and files may be returned. - Ingest persists rows into SQLite at SCIDK_DB_PATH (default: ~/.scidk/db/files.db) in WAL mode. - - Status endpoint returns file_count (files only), folder_count (top-level when non-recursive), and ingested_rows (files + folders inserted). + - Status endpoint returns file_count (files only), folder_count (top-level when non-recursive), and ingested_rows (files + folders inserted). \ No newline at end of file diff --git a/scidk/app.py b/scidk/app.py index 42b3ab2..1e6bbc7 100644 --- a/scidk/app.py +++ b/scidk/app.py @@ -1345,25 +1345,57 @@ def api_scan_status(scan_id): @api.get('/scans//fs') def api_scan_fs(scan_id): + # Attempt SQLite-backed listing for rclone scans first + scans = app.extensions['scidk'].get('scans', {}) + s = scans.get(scan_id) + req_path = (request.args.get('path') or '').strip() + if s and s.get('provider_id') == 'rclone': + try: + from .core import path_index_sqlite as pix + if not req_path: + roots = pix.list_roots(scan_id) + from pathlib import Path as _P + folders = [{'name': _P(p).name, 'path': p, 'file_count': 0} for p in roots] + folders.sort(key=lambda r: r['name'].lower()) + breadcrumb = [{'name': '(scan roots)', 'path': ''}] + return jsonify({'scan_id': scan_id, 'path': '', 'breadcrumb': breadcrumb, 'folders': folders, 'files': []}), 200 + # list children for requested path + listing = pix.list_children(scan_id, req_path) + from pathlib import Path as _P + # Build breadcrumb by walking up to a root present in DB + breadcrumb = [{'name': '(scan roots)', 'path': ''}] + cur = req_path + safety = 0 + while cur and safety < 100: + breadcrumb.append({'name': _P(cur).name or cur, 'path': cur}) + par = str(_P(cur).parent) + if par == cur: + break + cur = par + # stop when we reach an entry that has no parent in DB roots + if cur in pix.list_roots(scan_id): + breadcrumb.append({'name': _P(cur).name or cur, 'path': cur}) + break + safety += 1 + return jsonify({'scan_id': scan_id, 'path': req_path, 'breadcrumb': breadcrumb, 'folders': listing['folders'], 'files': listing['files']}), 200 + except Exception: + pass # fallback to in-memory index + # Fallback to in-memory index for local/mounted or if SQLite not available idx = _get_or_build_scan_index(scan_id) if not idx: return jsonify({'error': 'scan not found'}), 404 from pathlib import Path as _P - req_path = (request.args.get('path') or '').strip() folder_info = idx['folder_info'] children_folders = idx['children_folders'] children_files = idx['children_files'] roots = idx['roots'] - # Virtual root listing when no path specified if not req_path: folders = [{'name': _P(p).name, 'path': p, 'file_count': len(children_files.get(p, []))} for p in roots] folders.sort(key=lambda r: r['name'].lower()) breadcrumb = [{'name': '(scan roots)', 'path': ''}] return jsonify({'scan_id': scan_id, 'path': '', 'breadcrumb': breadcrumb, 'folders': folders, 'files': []}), 200 - # Validate path exists in snapshot if req_path not in folder_info: return jsonify({'error': 'folder not found in scan'}), 404 - # Breadcrumb from this scan’s perspective bc_chain = [] cur = req_path while cur and cur in folder_info: @@ -1374,12 +1406,111 @@ def api_scan_fs(scan_id): cur = par bc_chain.reverse() breadcrumb = [{'name': '(scan roots)', 'path': ''}] + [{'name': _P(p).name, 'path': p} for p in bc_chain] - # Children sub_folders = [{'name': _P(p).name, 'path': p, 'file_count': len(children_files.get(p, []))} for p in children_folders.get(req_path, [])] sub_folders.sort(key=lambda r: r['name'].lower()) files = children_files.get(req_path, []) return jsonify({'scan_id': scan_id, 'path': req_path, 'breadcrumb': breadcrumb, 'folders': sub_folders, 'files': files}), 200 + @api.post('/ro-crates/referenced') + def api_ro_crate_referenced(): + data = request.get_json(force=True, silent=True) or {} + dataset_ids = data.get('dataset_ids') or [] + files = data.get('files') or [] + title = data.get('title') or 'Referenced RO-Crate' + import time as _t, hashlib as _h, json as _json + now = _t.time() + crate_id = _h.sha1(f"{title}|{now}".encode()).hexdigest()[:12] + base_dir = os.path.expanduser('~/.scidk/crates') + out_dir = os.path.join(base_dir, crate_id) + try: + os.makedirs(out_dir, exist_ok=True) + except Exception as e: + return jsonify({"status": "error", "error": f"could not create crate dir: {e}"}), 500 + # Gather items from dataset_ids and/or files + items = [] + try: + g = app.extensions['scidk']['graph'] + except Exception: + g = None + if dataset_ids and g is not None: + ds_map = getattr(g, 'datasets', {}) + for did in dataset_ids: + d = ds_map.get(did) + if not d: + continue + items.append({ + 'path': d.get('path'), + 'name': d.get('filename') or Path(d.get('path') or '').name, + 'size': int(d.get('size_bytes') or 0), + 'mime_type': d.get('mime_type'), + 'modified_time': float(d.get('modified') or 0.0), + 'checksum': d.get('checksum'), + }) + for f in files: + items.append({ + 'path': f.get('path') or f.get('url') or f.get('contentUrl'), + 'name': f.get('name'), + 'size': f.get('size') or f.get('size_bytes') or 0, + 'mime_type': f.get('mime') or f.get('mime_type'), + 'modified_time': f.get('modified') or f.get('modified_time') or 0.0, + 'checksum': f.get('checksum'), + }) + def to_rclone_url(p: Optional[str]) -> Optional[str]: + if not p or not isinstance(p, str): + return None + if '://' in p: + return p + if ':' in p: + remote, rest = p.split(':', 1) + rest = (rest or '').lstrip('/') + return f"rclone://{remote}/{rest}" if rest else f"rclone://{remote}/" + try: + return f"file://{str(Path(p).resolve())}" + except Exception: + return f"file://{p}" + graph = [] + graph.append({ + "@id": "ro-crate-metadata.json", + "@type": "CreativeWork", + "about": {"@id": "./"} + }) + has_parts = [] + file_nodes = [] + import datetime as _dt + for it in items: + url = to_rclone_url(it.get('path')) + if not url: + continue + has_parts.append({"@id": url}) + node = {"@id": url, "@type": "File", "contentUrl": url} + if it.get('name'): + node['name'] = it.get('name') + try: + node['contentSize'] = int(it.get('size') or 0) + except Exception: + pass + if it.get('mime_type'): + node['encodingFormat'] = it.get('mime_type') + try: + mt = float(it.get('modified_time') or 0.0) + if mt: + node['dateModified'] = _dt.datetime.utcfromtimestamp(mt).isoformat() + 'Z' + except Exception: + pass + if it.get('checksum'): + node['checksum'] = it.get('checksum') + file_nodes.append(node) + root = {"@id": "./", "@type": "Dataset", "name": title, "hasPart": has_parts} + graph.append(root) + graph.extend(file_nodes) + ro = {"@context": "https://w3id.org/ro/crate/1.1/context", "@graph": graph} + try: + with open(os.path.join(out_dir, 'ro-crate-metadata.json'), 'w', encoding='utf-8') as fh: + _json.dump(ro, fh, indent=2) + except Exception as e: + return jsonify({"status": "error", "error": f"could not write ro-crate: {e}"}), 500 + return jsonify({"status": "ok", "crate_id": crate_id, "path": out_dir}), 200 + @api.post('/scans//commit') def api_scan_commit(scan_id): scans = app.extensions['scidk'].setdefault('scans', {}) diff --git a/scidk/core/path_index_sqlite.py b/scidk/core/path_index_sqlite.py index 95346dd..03cf4cf 100644 --- a/scidk/core/path_index_sqlite.py +++ b/scidk/core/path_index_sqlite.py @@ -1,166 +1,187 @@ +from __future__ import annotations import os import sqlite3 from pathlib import Path -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Tuple + +DEFAULT_DB = os.path.expanduser("~/.scidk/db/files.db") + +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT NOT NULL, + parent_path TEXT, + name TEXT, + depth INTEGER, + type TEXT, -- 'file' | 'folder' + size INTEGER, + modified_time REAL, + file_extension TEXT, + mime_type TEXT, + etag TEXT, + hash TEXT, + remote TEXT, + scan_id TEXT, + extra_json TEXT +); +-- Composite-friendly indexes to speed common queries +CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name); +CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension); +CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type); +""" + + +def _db_path() -> str: + return os.environ.get("SCIDK_DB_PATH", DEFAULT_DB) -# Minimal SQLite DAO for path index -# Schema from task-sqlite-path-index: -# files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) +def connect() -> sqlite3.Connection: + """Create a connection to the configured SQLite DB and enable WAL.""" + path = _db_path() + d = os.path.dirname(path) + if d and not os.path.isdir(d): + os.makedirs(d, exist_ok=True) + conn = sqlite3.connect(path) + # Force WAL; tests expect exact 'wal' + conn.execute("PRAGMA journal_mode=WAL;") + return conn -def _db_path() -> Path: - # Allow override via env; default to ~/.scidk/db/files.db - base = os.environ.get('SCIDK_DB_PATH') - if base: - p = Path(base) - else: - p = Path.home() / '.scidk' / 'db' / 'files.db' - p.parent.mkdir(parents=True, exist_ok=True) - return p +def init_db(conn: sqlite3.Connection) -> None: + conn.executescript(SCHEMA_SQL) + conn.commit() -def connect() -> sqlite3.Connection: - p = _db_path() - conn = sqlite3.connect(str(p)) - # WAL mode as per acceptance - try: - conn.execute('PRAGMA journal_mode=WAL;') - except Exception: - pass + +def _ensure_db() -> sqlite3.Connection: + conn = connect() + init_db(conn) return conn -def init_db(conn: Optional[sqlite3.Connection] = None): - own = False - if conn is None: - conn = connect() - own = True +def map_rclone_item_to_row(item: Dict, root_path: str, scan_id: str) -> Dict: + """Map a single rclone lsjson item to the path-index row dict. + Expected item fields include: Name, Path, Size, IsDir, MimeType... + root_path is the scan root target (e.g., "remote:bucket" or "remote:") + """ + name = (item.get("Name") or item.get("Path") or "").strip() + is_dir = bool(item.get("IsDir")) + size = int(item.get("Size") or 0) + # Build full path by joining root and Name (or Path) with '/' unless root endswith ':' + base = root_path if root_path else "" + if base.endswith(":"): + full_path = f"{base}{name}" if name else base + else: + if name: + full_path = f"{base.rstrip('/')}/{name}" + else: + full_path = base.rstrip('/') + p = Path(full_path) + parent = str(p.parent) if str(p) else "" + depth = len([x for x in p.parts if x not in ("", "/")]) + ext = "" if is_dir else p.suffix.lower() + row = { + "path": str(p), + "parent_path": parent, + "name": p.name or str(p), + "depth": depth, + "type": "folder" if is_dir else "file", + "size": 0 if is_dir else size, + "modified": 0.0, + "ext": ext, + "scan_id": scan_id, + # provider/host metadata can be filled by caller in future; keep None for now + "provider_id": "rclone", + "host_type": "rclone", + "host_id": None, + "root_id": None, + "root_label": None, + } + return row + + + + +def list_roots(scan_id: str) -> List[str]: + """Return root folder paths for a scan (folders whose parent_path is NULL or empty within this scan).""" + conn = _ensure_db() try: - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE IF NOT EXISTS files ( - path TEXT NOT NULL, - parent_path TEXT, - name TEXT NOT NULL, - depth INTEGER NOT NULL, - type TEXT NOT NULL, - size INTEGER NOT NULL, - modified_time REAL, - file_extension TEXT, - mime_type TEXT, - etag TEXT, - hash TEXT, - remote TEXT, - scan_id TEXT, - extra_json TEXT - ); - """ + cur = conn.execute( + "SELECT path FROM files WHERE scan_id=? AND type='folder' AND (parent_path IS NULL OR parent_path='') ORDER BY name COLLATE NOCASE", + (scan_id,) ) - # Indexes - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name);") - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension);") - cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type);") - conn.commit() + return [r[0] for r in cur.fetchall()] finally: - if own: + try: conn.close() + except Exception: + pass -def _parent_of(path: str) -> str: - try: - p = Path(path) - return str(p.parent) - except Exception: - return '' - - -def _name_of(path: str) -> str: +def list_children(scan_id: str, parent_path: str) -> Dict[str, List[Dict]]: + """List child folders and files for a given parent_path within a scan. + Returns dict with keys folders, files. Each entry has name, path, and size for files. + """ + conn = _ensure_db() try: - p = Path(path) - return p.name or str(p) - except Exception: - return path + folders = [] + files = [] + for row in conn.execute( + "SELECT name, path FROM files WHERE scan_id=? AND parent_path=? AND type='folder' ORDER BY name COLLATE NOCASE", + (scan_id, parent_path) + ).fetchall(): + folders.append({'name': row[0], 'path': row[1]}) + for row in conn.execute( + "SELECT name, path, size, file_extension, mime_type FROM files WHERE scan_id=? AND parent_path=? AND type='file' ORDER BY name COLLATE NOCASE", + (scan_id, parent_path) + ).fetchall(): + files.append({'name': row[0], 'path': row[1], 'size_bytes': int(row[2] or 0), 'extension': row[3] or '', 'mime_type': row[4]}) + return {'folders': folders, 'files': files} + finally: + try: + conn.close() + except Exception: + pass -def _depth_of(path: str) -> int: - try: - # Count separators; for rclone remotes like "remote:folder/sub", keep colon as root and split on '/' - # e.g., "remote:folder/sub" -> depth 2 (folder, sub) - if ':' in path: - suffix = path.split(':', 1)[1] - if suffix.startswith('/'): - suffix = suffix[1:] - return 0 if not suffix else suffix.count('/') + 1 - # local style - return 0 if path in ('', '/', None) else str(Path(path)).strip('/').count('/') + 1 - except Exception: +def batch_insert_files(rows: List[Tuple], batch_size: int = 10000) -> int: + """Insert rows into SQLite in batches. Returns inserted row count. + Creates the DB and schema on first use. + The row tuple order must match the schema fields used by tests: + (path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) + """ + if not rows: return 0 - - -def map_rclone_item_to_row(item: Dict, target_root: str, scan_id: str) -> Tuple: - # rclone lsjson fields: Name, Path, Size, MimeType, ModTime, IsDir - name = (item.get('Name') or item.get('Path') or '') - is_dir = bool(item.get('IsDir')) - size = int(item.get('Size') or 0) - mime = item.get('MimeType') - # Full path under target root - base = target_root if target_root.endswith(':') else target_root.rstrip('/') - full = f"{base}/{name}" if not base.endswith(':') else f"{base}{name}" - parent = _parent_of(full) - depth = _depth_of(full) - ext = '' if is_dir else Path(name).suffix.lower() - mtime = None - # ModTime may be ISO8601; we can store as text in extra_json or leave None for MVP - # ETag/Hash not available from lsjson by default - etag = None - ahash = None - remote = target_root.split(':', 1)[0] if ':' in target_root else None - type_val = 'folder' if is_dir else 'file' - extra = None - return ( - full, # path - parent, # parent_path - name, # name - depth, # depth - type_val, # type - size, # size - mtime, # modified_time - ext, # file_extension - mime, # mime_type - etag, # etag - ahash, # hash - remote, # remote - scan_id, # scan_id - extra, # extra_json + conn = _ensure_db() + inserted = 0 + cols = ( + "path,parent_path,name,depth,type,size,modified_time,file_extension,mime_type,etag,hash,remote,scan_id,extra_json" ) - - -def batch_insert_files(rows: Iterable[Tuple], batch_size: int = 10000) -> int: - """Insert rows in batches. Returns total inserted.""" - conn = connect() - init_db(conn) - total = 0 + sql = f"INSERT INTO files ({cols}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)" try: - cur = conn.cursor() - buf: List[Tuple] = [] - for r in rows: - buf.append(r) - if len(buf) >= batch_size: - cur.executemany( - "INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - buf, - ) - conn.commit() - total += len(buf) - buf.clear() - if buf: - cur.executemany( - "INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - buf, - ) - conn.commit() - total += len(buf) - return total + with conn: + batch: List[Tuple] = [] + for r in rows: + if not isinstance(r, tuple): + # tolerate dict style rows minimally by mapping known keys + try: + r = ( + r.get('path'), r.get('parent_path'), r.get('name'), r.get('depth'), r.get('type'), r.get('size'), + r.get('modified_time') or r.get('modified') or 0.0, r.get('file_extension') or r.get('ext'), + r.get('mime_type'), r.get('etag'), r.get('hash'), r.get('remote'), r.get('scan_id'), r.get('extra_json') + ) + except Exception: + continue + batch.append(r) + if len(batch) >= batch_size: + conn.executemany(sql, batch) + inserted += len(batch) + batch = [] + if batch: + conn.executemany(sql, batch) + inserted += len(batch) finally: - conn.close() + try: + conn.close() + except Exception: + pass + return inserted