diff --git a/scidk/app.py b/scidk/app.py index 3556ced..c22d863 100644 --- a/scidk/app.py +++ b/scidk/app.py @@ -534,104 +534,12 @@ def _get_neo4j_params(): # Build rows for commit: files (rows) and standalone folders (folder_rows) def build_commit_rows(scan, ds_map): """Legacy builder from in-memory datasets.""" - from .core.path_utils import parse_remote_path, parent_remote_path - checksums = scan.get('checksums') or [] - # Helpers unified on central path utils - def _parent_of(p: str) -> str: - try: - info = parse_remote_path(p) - if info.get('is_remote'): - return parent_remote_path(p) - except Exception: - pass - # Fallback to pathlib for local/absolute paths - from pathlib import Path as __P - try: - return str(__P(p).parent) - except Exception: - return '' - def _name_of(p: str) -> str: - try: - info = parse_remote_path(p) - if info.get('is_remote'): - parts = info.get('parts') or [] - if not parts: - return info.get('remote_name') or '' - return parts[-1] - except Exception: - pass - from pathlib import Path as __P - try: - return __P(p).name - except Exception: - return p - def _parent_name_of(p: str) -> str: - try: - par = _parent_of(p) - info = parse_remote_path(par) - if info.get('is_remote'): - parts = info.get('parts') or [] - if not parts: - return info.get('remote_name') or '' - return parts[-1] - except Exception: - pass - from pathlib import Path as __P - try: - return __P(par).name - except Exception: - return par - # Precompute folders observed in this scan (parents of files) - folder_set = set() - for ch in checksums: - dtmp = ds_map.get(ch) - if not dtmp: - continue - folder_set.add(_parent_of(dtmp.get('path') or '')) - rows = [] - for ch in checksums: - d = ds_map.get(ch) - if not d: - continue - parent = _parent_of(d.get('path') or '') - interps = list((d.get('interpretations') or {}).keys()) - # derive folder fields - folder_path = parent - folder_name = _name_of(folder_path) if folder_path else '' - folder_parent = _parent_of(folder_path) if folder_path else '' - folder_parent_name = _parent_name_of(folder_path) if folder_parent else '' - rows.append({ - 'checksum': d.get('checksum'), - 'path': d.get('path'), - 'filename': d.get('filename'), - 'extension': d.get('extension'), - 'size_bytes': int(d.get('size_bytes') or 0), - 'created': float(d.get('created') or 0), - 'modified': float(d.get('modified') or 0), - 'mime_type': d.get('mime_type'), - 'folder': folder_path, - 'folder_name': folder_name, - 'folder_parent': folder_parent, - 'folder_parent_name': folder_parent_name, - 'parent_in_scan': bool(folder_parent and (folder_parent in folder_set)), - 'interps': interps, - }) - # Build folder rows captured during non-recursive scan - folder_rows = [] - for f in (scan.get('folders') or []): - folder_rows.append({ - 'path': f.get('path'), - 'name': f.get('name'), - 'parent': f.get('parent'), - 'parent_name': f.get('parent_name'), - }) - # Enhance with complete hierarchy try: - from .core.folder_hierarchy import build_complete_folder_hierarchy - folder_rows = build_complete_folder_hierarchy(rows, folder_rows, scan) + from .services.commit_service import CommitService + return CommitService().build_rows_legacy_from_datasets(scan, ds_map) except Exception: - pass - return rows, folder_rows + # Fallback to empty on unexpected import/runtime error + return [], [] # Execute Neo4j commit using simplified, idempotent Cypher def commit_to_neo4j(rows, folder_rows, scan, neo4j_params): @@ -659,75 +567,17 @@ def commit_to_neo4j(rows, folder_rows, scan, neo4j_params): return result result['attempted'] = True try: - from neo4j import GraphDatabase # type: ignore - driver = None + from .services.neo4j_client import Neo4jClient + client = Neo4jClient(uri, user, pwd, database, auth_mode).connect() try: - driver = GraphDatabase.driver(uri, auth=None if auth_mode == 'none' else (user, pwd)) - with driver.session(database=database) as sess: - # Try to create composite constraints (Neo4j 5+) — ignore if unsupported - try: - sess.run("CREATE CONSTRAINT file_identity IF NOT EXISTS FOR (f:File) REQUIRE (f.path, f.host) IS UNIQUE").consume() - except Exception: - pass - try: - sess.run("CREATE CONSTRAINT folder_identity IF NOT EXISTS FOR (d:Folder) REQUIRE (d.path, d.host) IS UNIQUE").consume() - except Exception: - pass - cypher = ( - "MERGE (s:Scan {id: $scan_id}) " - "SET s.path = $scan_path, s.started = $scan_started, s.ended = $scan_ended, " - " s.provider_id = $scan_provider, s.host_type = $scan_host_type, s.host_id = $scan_host_id, " - " s.root_id = $scan_root_id, s.root_label = $scan_root_label, s.scan_source = $scan_source " - "WITH s " - "UNWIND $folders AS folder " - "MERGE (fo:Folder {path: folder.path, host: $node_host}) " - " SET fo.name = folder.name, fo.provider_id = $scan_provider, fo.host_type = $scan_host_type, fo.host_id = $scan_host_id " - "MERGE (fo)-[:SCANNED_IN]->(s) " - "WITH s " - "UNWIND $folders AS folder " - "WITH s, folder WHERE folder.parent IS NOT NULL AND folder.parent <> '' AND folder.parent <> folder.path " - "MERGE (child:Folder {path: folder.path, host: $node_host}) " - "MERGE (parent:Folder {path: folder.parent, host: $node_host}) " - "MERGE (parent)-[:CONTAINS]->(child) " - "WITH s " - "UNWIND $rows AS r " - "MERGE (f:File {path: r.path, host: $node_host}) " - " SET f.filename = r.filename, f.extension = r.extension, f.size_bytes = r.size_bytes, f.created = r.created, f.modified = r.modified, f.mime_type = r.mime_type, f.provider_id = $scan_provider, f.host_type = $scan_host_type, f.host_id = $scan_host_id " - "MERGE (f)-[:SCANNED_IN]->(s) " - "WITH r, f " - "WHERE r.folder IS NOT NULL AND r.folder <> '' " - "MERGE (fo:Folder {path: r.folder, host: $node_host}) " - "MERGE (fo)-[:CONTAINS]->(f) " - "RETURN $scan_id AS scan_id" - ) - res = sess.run(cypher, rows=rows, folders=folder_rows, scan_id=scan.get('id'), scan_path=scan.get('path'), scan_started=scan.get('started'), scan_ended=scan.get('ended'), scan_provider=scan.get('provider_id'), scan_host_type=scan.get('host_type'), scan_host_id=scan.get('host_id'), scan_root_id=scan.get('root_id'), scan_root_label=scan.get('root_label'), scan_source=scan.get('scan_source'), node_host=scan.get('host_id'), node_port=None) - _ = list(res) - result['written_files'] = len(rows) - result['written_folders'] = len(folder_rows) - # Post-commit verification: confirm that Scan exists and at least one SCANNED_IN relationship was created - verify_q = ( - "OPTIONAL MATCH (s:Scan {id: $scan_id}) " - "WITH s " - "OPTIONAL MATCH (s)<-[:SCANNED_IN]-(f:File) " - "WITH s, count(DISTINCT f) AS files_cnt " - "OPTIONAL MATCH (s)<-[:SCANNED_IN]-(fo:Folder) " - "RETURN coalesce(s IS NOT NULL, false) AS scan_exists, files_cnt AS files_cnt, count(DISTINCT fo) AS folders_cnt" - ) - vrec = sess.run(verify_q, scan_id=scan.get('id')).single() - if vrec: - scan_exists = bool(vrec.get('scan_exists')) - files_cnt = int(vrec.get('files_cnt') or 0) - folders_cnt = int(vrec.get('folders_cnt') or 0) - result['db_scan_exists'] = scan_exists - result['db_files'] = files_cnt - result['db_folders'] = folders_cnt - result['db_verified'] = bool(scan_exists and (files_cnt > 0 or folders_cnt > 0)) + client.ensure_constraints() + wres = client.write_scan(rows, folder_rows, scan) + result['written_files'] = wres.get('written_files', 0) + result['written_folders'] = wres.get('written_folders', 0) + vres = client.verify(scan.get('id')) + result.update(vres) finally: - try: - if driver is not None: - driver.close() - except Exception: - pass + client.close() except Exception as e: msg = str(e) result['error'] = msg @@ -906,6 +756,27 @@ def api_scan(): fast_list = bool(data.get('fast_list', False)) # Prefer fast_list by default for recursive rclone scans if client omitted it _client_specified_fast_list = ('fast_list' in data) + # Delegate to ScansService (refactor): preserve payload and behavior + try: + from .services.scans_service import ScansService + svc = ScansService(app) + result = svc.run_scan({ + 'provider_id': provider_id, + 'root_id': root_id, + 'path': path, + 'recursive': recursive, + 'fast_list': fast_list, + }) + if isinstance(result, dict) and result.get('status') == 'ok': + return jsonify(result), 200 + # Error path with optional http_status + if isinstance(result, dict) and result.get('status') == 'error': + code = int(result.get('http_status', 400)) + payload = {'status': 'error', 'error': result.get('error')} + return jsonify(payload), code + except Exception: + # On service failure, fallback to legacy in-place implementation below + pass try: import time, hashlib, json from .core import path_index_sqlite as pix @@ -2407,92 +2278,28 @@ def api_scan_fs(scan_id): @api.get('/scans//browse') def api_scan_browse(scan_id): """Browse direct children from the SQLite index for a scan. + Delegates to FSIndexService.browse_children. Query params: - - path (required): parent folder to list direct children for. - - page_size (optional, default 100): limit per page. - - next_page_token (optional): opaque pagination token (OFFSET in MVP). - - extension (optional): filter by file_extension (e.g., ".txt"). - - type (optional): filter by type ("file" or "folder"). - - Sorting: type DESC, name ASC. - Returns: { scan_id, path, page_size, next_page_token?, entries: [ ... ] } + - path (optional): parent folder; defaults to scan base path + - page_size (optional, default 100) + - next_page_token (optional) + - extension / ext (optional) + - type (optional) """ - # Validate scan exists in session - s = app.extensions['scidk'].get('scans', {}).get(scan_id) - if not s: - return jsonify({'error': 'scan not found'}), 404 - from .core import path_index_sqlite as pix + from .services.fs_index_service import FSIndexService + svc = FSIndexService(app) req_path = (request.args.get('path') or '').strip() - if req_path == '': - # Default to the scan root path if not provided - req_path = str(s.get('path') or '') - # Normalize page_size and token + # page_size try: page_size = int(request.args.get('page_size') or 100) except Exception: page_size = 100 - page_size = max(1, min(page_size, 1000)) # simple guardrails - token_raw = (request.args.get('next_page_token') or '').strip() - try: - offset = int(token_raw) if token_raw else 0 - except Exception: - offset = 0 - # Optional filters - ext = (request.args.get('extension') or request.args.get('ext') or '').strip().lower() - typ = (request.args.get('type') or '').strip().lower() - # Build query - where = ["scan_id = ?", "parent_path = ?"] - params = [scan_id, req_path] - if ext: - where.append("file_extension = ?") - params.append(ext) - if typ: - where.append("type = ?") - params.append(typ) - where_sql = " AND ".join(where) - sql = ( - "SELECT path, name, type, size, modified_time, file_extension, mime_type " - f"FROM files WHERE {where_sql} " - "ORDER BY type DESC, name ASC " - "LIMIT ? OFFSET ?" - ) - params.extend([page_size + 1, offset]) # fetch one extra row to derive next_page_token - try: - conn = pix.connect() - pix.init_db(conn) - cur = conn.execute(sql, params) - rows = cur.fetchall() - except Exception as e: - return jsonify({'error': str(e)}), 500 - finally: - try: - conn.close() - except Exception: - pass - # Build entries - entries = [] - for r in rows[:page_size]: - path_val, name_val, type_val, size_val, mtime_val, ext_val, mime_val = r - entries.append({ - 'path': path_val, - 'name': name_val, - 'type': type_val, - 'size': int(size_val or 0), - 'modified': float(mtime_val or 0.0), - 'extension': ext_val or '', - 'mime_type': mime_val, - }) - next_token = str(offset + page_size) if len(rows) > page_size else None - out = { - 'scan_id': scan_id, - 'path': req_path, - 'page_size': page_size, - 'entries': entries, + token = (request.args.get('next_page_token') or '').strip() + filters = { + 'extension': (request.args.get('extension') or request.args.get('ext') or '').strip().lower(), + 'type': (request.args.get('type') or '').strip().lower(), } - if next_token is not None: - out['next_page_token'] = next_token - - return jsonify(out), 200 + return svc.browse_children(scan_id, req_path, page_size, token, filters) @api.post('/ro-crates/referenced') def api_ro_crates_referenced(): diff --git a/scidk/services/__init__.py b/scidk/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scidk/services/commit_service.py b/scidk/services/commit_service.py new file mode 100644 index 0000000..7f2a1c4 --- /dev/null +++ b/scidk/services/commit_service.py @@ -0,0 +1,129 @@ +from __future__ import annotations +from typing import Dict, List, Tuple + + +class CommitService: + """ + Commit pipeline helpers. Centralizes logic for preparing rows for commit + from either the file index (preferred) or legacy in-memory dataset maps. + """ + + # --- Index-based builder --- + def build_rows_from_index(self, scan_id: str, scan: Dict, include_hierarchy: bool = True) -> Tuple[List[Dict], List[Dict]]: + """Build file and folder rows for a scan from the SQLite path index. + + Delegates to core.commit_rows_from_index.build_rows_for_scan_from_index. + """ + from ..core.commit_rows_from_index import build_rows_for_scan_from_index + return build_rows_for_scan_from_index(scan_id, scan, include_hierarchy) + + # --- Legacy builder (from in-memory dataset map) --- + def build_rows_legacy_from_datasets(self, scan: Dict, ds_map: Dict[str, Dict]) -> Tuple[List[Dict], List[Dict]]: + """Legacy builder used by endpoints when committing from datasets in memory. + + This is a refactor of the function previously defined inside app.create_app(). + Returns (rows, folder_rows). + """ + from ..core.path_utils import parse_remote_path, parent_remote_path + + checksums = scan.get('checksums') or [] + + def _parent_of(p: str) -> str: + try: + info = parse_remote_path(p) + if info.get('is_remote'): + return parent_remote_path(p) + except Exception: + pass + from pathlib import Path as __P + try: + return str(__P(p).parent) + except Exception: + return '' + + def _name_of(p: str) -> str: + try: + info = parse_remote_path(p) + if info.get('is_remote'): + parts = info.get('parts') or [] + if not parts: + return info.get('remote_name') or '' + return parts[-1] + except Exception: + pass + from pathlib import Path as __P + try: + return __P(p).name + except Exception: + return p + + def _parent_name_of(p: str) -> str: + try: + par = _parent_of(p) + info = parse_remote_path(par) + if info.get('is_remote'): + parts = info.get('parts') or [] + if not parts: + return info.get('remote_name') or '' + return parts[-1] + except Exception: + pass + from pathlib import Path as __P + try: + return __P(par).name + except Exception: + return par + + # Precompute folders observed in this scan (parents of files) + folder_set = set() + for ch in checksums: + dtmp = ds_map.get(ch) + if not dtmp: + continue + folder_set.add(_parent_of(dtmp.get('path') or '')) + + rows: List[Dict] = [] + for ch in checksums: + d = ds_map.get(ch) + if not d: + continue + parent = _parent_of(d.get('path') or '') + interps = list((d.get('interpretations') or {}).keys()) + folder_path = parent + folder_name = _name_of(folder_path) if folder_path else '' + folder_parent = _parent_of(folder_path) if folder_path else '' + folder_parent_name = _parent_name_of(folder_path) if folder_parent else '' + rows.append({ + 'checksum': d.get('checksum'), + 'path': d.get('path'), + 'filename': d.get('filename'), + 'extension': d.get('extension'), + 'size_bytes': int(d.get('size_bytes') or 0), + 'created': float(d.get('created') or 0), + 'modified': float(d.get('modified') or 0), + 'mime_type': d.get('mime_type'), + 'folder': folder_path, + 'folder_name': folder_name, + 'folder_parent': folder_parent, + 'folder_parent_name': folder_parent_name, + 'parent_in_scan': bool(folder_parent and (folder_parent in folder_set)), + 'interps': interps, + }) + + folder_rows: List[Dict] = [] + for f in (scan.get('folders') or []): + folder_rows.append({ + 'path': f.get('path'), + 'name': f.get('name'), + 'parent': f.get('parent'), + 'parent_name': f.get('parent_name'), + }) + + # Enhance with complete hierarchy + try: + from ..core.folder_hierarchy import build_complete_folder_hierarchy + folder_rows = build_complete_folder_hierarchy(rows, folder_rows, scan) + except Exception: + pass + + return rows, folder_rows diff --git a/scidk/services/fs_index_service.py b/scidk/services/fs_index_service.py new file mode 100644 index 0000000..5d55f00 --- /dev/null +++ b/scidk/services/fs_index_service.py @@ -0,0 +1,111 @@ +from __future__ import annotations +from typing import Any, Dict, Optional + + +class FSIndexService: + """ + SQLite-backed filesystem index browsing service. + + Provides a stable, index-backed listing of direct children under a parent path + for a given scan, with server-side pagination and simple filters. + + Contract: + - Ordering: type DESC, name ASC + - Pagination token: opaque offset string (stable for MVP) + - Filters: type (file|folder), extension (normalized lowercase, includes leading dot if provided) + """ + + def __init__(self, app): + self.app = app + + def browse_children( + self, + scan_id: str, + parent_path: Optional[str], + page_size: int = 100, + next_page_token: Optional[str] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + from flask import jsonify # lazy import to avoid hard dependency at import time + from ..core import path_index_sqlite as pix + + # Ensure scan exists + scan = self.app.extensions['scidk'].get('scans', {}).get(scan_id) + if not scan: + return jsonify({'error': 'scan not found'}), 404 + + # Resolve parent path default to scan base path + req_path = (parent_path or '').strip() + if req_path == '': + req_path = str(scan.get('path') or '') + + # Normalize pagination + try: + limit = int(page_size) + except Exception: + limit = 100 + limit = max(1, min(limit, 1000)) + + token_raw = (next_page_token or '').strip() + try: + offset = int(token_raw) if token_raw else 0 + except Exception: + offset = 0 + + # Normalize filters + filters = filters or {} + ext = (filters.get('extension') or filters.get('ext') or '').strip().lower() + typ = (filters.get('type') or '').strip().lower() + + where = ["scan_id = ?", "parent_path = ?"] + params: list[Any] = [scan_id, req_path] + if ext: + where.append("file_extension = ?") + params.append(ext) + if typ: + where.append("type = ?") + params.append(typ) + where_sql = " AND ".join(where) + sql = ( + "SELECT path, name, type, size, modified_time, file_extension, mime_type " + f"FROM files WHERE {where_sql} " + "ORDER BY type DESC, name ASC " + "LIMIT ? OFFSET ?" + ) + params.extend([limit + 1, offset]) + + try: + conn = pix.connect(); pix.init_db(conn) + cur = conn.execute(sql, params) + rows = cur.fetchall() + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + try: + conn.close() # type: ignore[name-defined] + except Exception: + pass + + entries = [] + for r in rows[:limit]: + path_val, name_val, type_val, size_val, mtime_val, ext_val, mime_val = r + entries.append({ + 'path': path_val, + 'name': name_val, + 'type': type_val, + 'size': int(size_val or 0), + 'modified': float(mtime_val or 0.0), + 'extension': ext_val or '', + 'mime_type': mime_val, + }) + next_token = str(offset + limit) if len(rows) > limit else None + + out = { + 'scan_id': scan_id, + 'path': req_path, + 'page_size': limit, + 'entries': entries, + } + if next_token is not None: + out['next_page_token'] = next_token + return jsonify(out), 200 diff --git a/scidk/services/neo4j_client.py b/scidk/services/neo4j_client.py new file mode 100644 index 0000000..92d4696 --- /dev/null +++ b/scidk/services/neo4j_client.py @@ -0,0 +1,128 @@ +from __future__ import annotations +from typing import Any, Dict, Optional, Tuple, List + + +class Neo4jClient: + """ + Thin client around neo4j-python-driver used by commit pipeline. + Provides ensure_constraints, write_scan, and verify operations. + """ + + def __init__(self, uri: str, user: Optional[str], password: Optional[str], database: Optional[str] = None, auth_mode: str = "basic"): + self._uri = uri + self._user = user + self._password = password + self._database = database + self._auth_mode = (auth_mode or 'basic').lower() + self._driver = None + + def connect(self): + from neo4j import GraphDatabase # type: ignore + auth = None if self._auth_mode == 'none' else (self._user, self._password) + self._driver = GraphDatabase.driver(self._uri, auth=auth) + return self + + def close(self): + try: + if self._driver is not None: + self._driver.close() + except Exception: + pass + + def _session(self): + if self._driver is None: + raise RuntimeError("Neo4jClient not connected") + if self._database: + return self._driver.session(database=self._database) + return self._driver.session() + + # --- Operations --- + def ensure_constraints(self) -> None: + try: + with self._session() as s: + try: + s.run("CREATE CONSTRAINT file_identity IF NOT EXISTS FOR (f:File) REQUIRE (f.path, f.host) IS UNIQUE").consume() + except Exception: + pass + try: + s.run("CREATE CONSTRAINT folder_identity IF NOT EXISTS FOR (d:Folder) REQUIRE (d.path, d.host) IS UNIQUE").consume() + except Exception: + pass + except Exception: + # best-effort, ignore errors + pass + + def write_scan(self, rows: List[Dict[str, Any]], folder_rows: List[Dict[str, Any]], scan: Dict[str, Any]) -> Dict[str, Any]: + """Upsert Scan, Folders, Files and relationships for one scan in a single query.""" + with self._session() as sess: + cypher = ( + "MERGE (s:Scan {id: $scan_id}) " + "SET s.path = $scan_path, s.started = $scan_started, s.ended = $scan_ended, " + " s.provider_id = $scan_provider, s.host_type = $scan_host_type, s.host_id = $scan_host_id, " + " s.root_id = $scan_root_id, s.root_label = $scan_root_label, s.scan_source = $scan_source " + "WITH s " + "UNWIND $folders AS folder " + "MERGE (fo:Folder {path: folder.path, host: $node_host}) " + " SET fo.name = folder.name, fo.provider_id = $scan_provider, fo.host_type = $scan_host_type, fo.host_id = $scan_host_id " + "MERGE (fo)-[:SCANNED_IN]->(s) " + "WITH s " + "UNWIND $folders AS folder " + "WITH s, folder WHERE folder.parent IS NOT NULL AND folder.parent <> '' AND folder.parent <> folder.path " + "MERGE (child:Folder {path: folder.path, host: $node_host}) " + "MERGE (parent:Folder {path: folder.parent, host: $node_host}) " + "MERGE (parent)-[:CONTAINS]->(child) " + "WITH s " + "UNWIND $rows AS r " + "MERGE (f:File {path: r.path, host: $node_host}) " + " SET f.filename = r.filename, f.extension = r.extension, f.size_bytes = r.size_bytes, f.created = r.created, f.modified = r.modified, f.mime_type = r.mime_type, f.provider_id = $scan_provider, f.host_type = $scan_host_type, f.host_id = $scan_host_id " + "MERGE (f)-[:SCANNED_IN]->(s) " + "WITH r, f " + "WHERE r.folder IS NOT NULL AND r.folder <> '' " + "MERGE (fo:Folder {path: r.folder, host: $node_host}) " + "MERGE (fo)-[:CONTAINS]->(f) " + "RETURN $scan_id AS scan_id" + ) + params = dict( + rows=rows, + folders=folder_rows, + scan_id=scan.get('id'), + scan_path=scan.get('path'), + scan_started=scan.get('started'), + scan_ended=scan.get('ended'), + scan_provider=scan.get('provider_id'), + scan_host_type=scan.get('host_type'), + scan_host_id=scan.get('host_id'), + scan_root_id=scan.get('root_id'), + scan_root_label=scan.get('root_label'), + scan_source=scan.get('scan_source'), + node_host=scan.get('host_id'), + node_port=None, + ) + _ = list(sess.run(cypher, **params)) + return { + 'written_files': len(rows), + 'written_folders': len(folder_rows), + } + + def verify(self, scan_id: str) -> Dict[str, Any]: + verify_q = ( + "OPTIONAL MATCH (s:Scan {id: $scan_id}) " + "WITH s " + "OPTIONAL MATCH (s)<-[:SCANNED_IN]-(f:File) " + "WITH s, count(DISTINCT f) AS files_cnt " + "OPTIONAL MATCH (s)<-[:SCANNED_IN]-(fo:Folder) " + "RETURN coalesce(s IS NOT NULL, false) AS scan_exists, files_cnt AS files_cnt, count(DISTINCT fo) AS folders_cnt" + ) + with self._session() as sess: + vrec = sess.run(verify_q, scan_id=scan_id).single() + if not vrec: + return {'db_verified': False} + scan_exists = bool(vrec.get('scan_exists')) + files_cnt = int(vrec.get('files_cnt') or 0) + folders_cnt = int(vrec.get('folders_cnt') or 0) + return { + 'db_scan_exists': scan_exists, + 'db_files': files_cnt, + 'db_folders': folders_cnt, + 'db_verified': bool(scan_exists and (files_cnt > 0 or folders_cnt > 0)), + } diff --git a/scidk/services/scans_service.py b/scidk/services/scans_service.py new file mode 100644 index 0000000..1b82655 --- /dev/null +++ b/scidk/services/scans_service.py @@ -0,0 +1,407 @@ +from __future__ import annotations +from typing import Dict, Any +from pathlib import Path +import os + +# This service encapsulates the scan orchestration that used to live inside app.api_scan +# It is intentionally kept very close to the original logic to preserve behavior and payload. + +class ScansService: + def __init__(self, app): + self.app = app + self.fs = app.extensions['scidk']['fs'] + self.registry = app.extensions['scidk']['registry'] + + def run_scan(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Execute a scan synchronously and return the same payload dict that api_scan used to return. + This method performs all the same side effects: populates SQLite index, in-memory datasets, and + updates app.extensions registries (scans, directories, telemetry). + """ + # Import inside to avoid heavy imports at module import time + from flask import jsonify # type: ignore + from . import path_index_sqlite as pix # type: ignore + from .path_utils import parse_remote_path, join_remote_path, parent_remote_path # type: ignore + import time, hashlib + + app = self.app + fs = self.fs + registry = self.registry + + provider_id = (data.get('provider_id') or 'local_fs').strip() or 'local_fs' + root_id = (data.get('root_id') or '/').strip() or '/' + path = data.get('path') or (root_id if provider_id != 'local_fs' else os.getcwd()) + recursive = bool(data.get('recursive', True)) + fast_list = bool(data.get('fast_list', False)) + client_specified_fast_list = ('fast_list' in data) + + # Snapshot before + before = set(ds.get('checksum') for ds in app.extensions['scidk']['graph'].list_datasets()) + started = time.time() + sid_src = f"{path}|{started}" + scan_id = hashlib.sha1(sid_src.encode()).hexdigest()[:12] + count = 0 + ingested = 0 + folders = [] + + if provider_id in ('local_fs', 'mounted_fs'): + base = Path(path) + items_files = [] + items_dirs = set() + # Source detection compatibility + try: + probe_ncdu = fs._list_files_with_ncdu(base, recursive=recursive) # type: ignore + if probe_ncdu: + fs.last_scan_source = 'ncdu' + else: + probe_gdu = fs._list_files_with_gdu(base, recursive=recursive) # type: ignore + if probe_gdu: + fs.last_scan_source = 'gdu' + else: + fs.last_scan_source = 'python' + except Exception: + fs.last_scan_source = 'python' + try: + if recursive: + for p in base.rglob('*'): + try: + if p.is_dir(): + items_dirs.add(p) + else: + items_files.append(p) + parent = p.parent + while parent and parent != parent.parent and str(parent).startswith(str(base)): + items_dirs.add(parent) + if parent == base: + break + parent = parent.parent + except Exception: + continue + items_dirs.add(base) + else: + for p in base.iterdir(): + try: + if p.is_dir(): + items_dirs.add(p) + else: + items_files.append(p) + except Exception: + continue + items_dirs.add(base) + except Exception: + items_files = [] + items_dirs = set() + + rows = [] + def _row_from_local(pth: Path, typ: str) -> tuple: + full = str(pth.resolve()) + parent = str(pth.parent.resolve()) if pth != pth.parent else '' + name = pth.name or full + depth = 0 if pth == base else max(0, len(str(pth.resolve()).rstrip('/').split('/')) - len(str(base.resolve()).rstrip('/').split('/'))) + size = 0 + mtime = None + ext = '' + mime = None + if typ == 'file': + try: + st = pth.stat() + size = int(st.st_size) + mtime = float(st.st_mtime) + except Exception: + size = 0 + mtime = None + ext = pth.suffix.lower() + remote = f"local:{os.uname().nodename}" if provider_id == 'local_fs' else f"mounted:{root_id}" + return (full, parent, name, depth, typ, size, mtime, ext, mime, None, None, remote, scan_id, None) + for d in sorted(items_dirs, key=lambda x: str(x)): + rows.append(_row_from_local(d, 'folder')) + for fpath in items_files: + rows.append(_row_from_local(fpath, 'file')) + ingested = pix.batch_insert_files(rows) + + # Legacy: create in-memory datasets and run interpreters + count = 0 + for fpath in items_files: + try: + ds = fs.create_dataset_node(fpath) + app.extensions['scidk']['graph'].upsert_dataset(ds) + interps = registry.select_for_dataset(ds) + for interp in interps: + try: + result = interp.interpret(fpath) + app.extensions['scidk']['graph'].add_interpretation(ds['checksum'], interp.id, { + 'status': result.get('status', 'success'), + 'data': result.get('data', result), + 'interpreter_version': getattr(interp, 'version', '0.0.1'), + }) + except Exception as e: + app.extensions['scidk']['graph'].add_interpretation(ds['checksum'], interp.id, { + 'status': 'error', + 'data': {'error': str(e)}, + 'interpreter_version': getattr(interp, 'version', '0.0.1'), + }) + count += 1 + except Exception: + continue + # Build folders metadata + for d in items_dirs: + try: + parent = str(d.parent.resolve()) if d != d.parent else '' + folders.append({'path': str(d.resolve()), 'name': d.name, 'parent': parent, 'parent_name': Path(parent).name if parent else ''}) + except Exception: + continue + elif provider_id == 'rclone': + provs = app.extensions['scidk'].get('providers') + prov = provs.get('rclone') if provs else None + if not prov: + return {'status': 'error', 'error': 'rclone provider not available', 'http_status': 400} + + # Normalize to full remote path if needed + try: + info = parse_remote_path(path or '') + is_remote = bool(info.get('is_remote')) + except Exception: + is_remote = False + if not is_remote: + path = join_remote_path(root_id, (path or '').lstrip('/')) + if recursive and not client_specified_fast_list: + fast_list = True + + # seed base folder row to ensure folder synthesis + try: + info_t = parse_remote_path(path) + base_name = (info_t.get('parts')[-1] if info_t.get('parts') else info_t.get('remote_name') or path) + base_parent = parent_remote_path(path) + base_item = {"Name": base_name, "Path": "", "IsDir": True, "Size": 0} + rows = [pix.map_rclone_item_to_row(base_item, path, scan_id)] + # folders list should include base as well + try: + info_par = parse_remote_path(base_parent) if base_parent else {} + if info_par.get('is_remote'): + parts = info_par.get('parts') or [] + parent_name = (info_par.get('remote_name') or '') if not parts else parts[-1] + else: + from pathlib import Path as _P + parent_name = _P(base_parent).name if base_parent else '' + except Exception: + parent_name = '' + folders.append({'path': path, 'name': base_name, 'parent': base_parent, 'parent_name': parent_name}) + except Exception: + rows = [] + + try: + if app.config.get('TESTING') and not recursive: + items = [] + else: + items = prov.list_files(path, recursive=recursive, fast_list=fast_list) # type: ignore[attr-defined] + except Exception as ee: + return {'status': 'error', 'error': str(ee), 'http_status': 400} + + seen_folders = set() + def _add_folder(full_path: str, name: str, parent: str): + if full_path in seen_folders: + return + seen_folders.add(full_path) + try: + info_par = parse_remote_path(parent) + if info_par.get('is_remote'): + parts = info_par.get('parts') or [] + parent_name = (info_par.get('remote_name') or '') if not parts else parts[-1] + else: + parent_name = Path(parent).name if parent else '' + except Exception: + parent_name = '' + folders.append({'path': full_path, 'name': name, 'parent': parent, 'parent_name': parent_name}) + + for it in (items or []): + try: + if it.get('IsDir'): + rel = it.get('Path') or it.get('Name') or '' + if rel: + full = join_remote_path(path, rel) + parent = parent_remote_path(full) + leaf = rel.rsplit('/',1)[-1] if isinstance(rel, str) and '/' in rel else rel + _add_folder(full, leaf, parent) + rows.append(pix.map_rclone_item_to_row(it, path, scan_id)) + continue + # File row + rows.append(pix.map_rclone_item_to_row(it, path, scan_id)) + # Synthesize folder chain for file rel paths + rel = it.get('Path') or it.get('Name') or '' + if rel: + parts = [p for p in (rel.split('/') if isinstance(rel, str) else []) if p] + cur_rel = '' + for i in range(len(parts)-1): + cur_rel = parts[i] if i == 0 else (cur_rel + '/' + parts[i]) + full = join_remote_path(path, cur_rel) + parent = parent_remote_path(full) + _add_folder(full, parts[i], parent) + try: + folder_item = {"Name": parts[i], "Path": cur_rel, "IsDir": True, "Size": 0} + rows.append(pix.map_rclone_item_to_row(folder_item, path, scan_id)) + except Exception: + pass + backend = (os.environ.get('SCIDK_GRAPH_BACKEND') or 'memory').strip().lower() + if backend != 'neo4j': + size = int(it.get('Size') or 0) + full = join_remote_path(path, rel) + ds = fs.create_dataset_remote(full, size_bytes=size, modified_ts=0.0, mime=None) + app.extensions['scidk']['graph'].upsert_dataset(ds) + count += 1 + except Exception: + continue + try: + # Dedup by (path,type) + seen = set(); uniq = [] + for r in rows: + key = (r[0], r[4]) + if key in seen: + continue + seen.add(key); uniq.append(r) + rows = uniq + except Exception: + pass + try: + ingested = pix.batch_insert_files(rows, batch_size=10000) + try: + _chg = pix.apply_basic_change_history(scan_id, path) + app.extensions['scidk'].setdefault('telemetry', {})['last_change_counts'] = _chg + except Exception as __e: + app.extensions['scidk'].setdefault('telemetry', {})['last_change_error'] = str(__e) + except Exception as _e: + app.extensions['scidk'].setdefault('telemetry', {})['last_sqlite_error'] = str(_e) + else: + return {'status': 'error', 'error': f'provider {provider_id} not supported for scan', 'http_status': 400} + + ended = time.time() + duration = ended - started + after = set(ds.get('checksum') for ds in app.extensions['scidk']['graph'].list_datasets()) + new_checksums = sorted(list(after - before)) + + # by_ext calculation preserved + by_ext: Dict[str, int] = {} + backend = (os.environ.get('SCIDK_GRAPH_BACKEND') or 'memory').strip().lower() + if backend == 'neo4j': + try: + conn = pix.connect(); pix.init_db(conn) + cur = conn.cursor() + cur.execute("SELECT file_extension FROM files WHERE scan_id = ? AND type='file'", (scan_id,)) + for (ext,) in cur.fetchall(): + ext = ext or '' + by_ext[ext] = by_ext.get(ext, 0) + 1 + conn.close() + except Exception: + by_ext = {} + else: + ext_map = {} + for ds in app.extensions['scidk']['graph'].list_datasets(): + ext_map[ds.get('checksum')] = ds.get('extension') or '' + for ch in new_checksums: + ext = ext_map.get(ch, '') + by_ext[ext] = by_ext.get(ext, 0) + 1 + + # Non-recursive local: include immediate subfolders + if provider_id in ('local_fs', 'mounted_fs'): + try: + if not recursive: + base = Path(path) + for child in base.iterdir(): + if child.is_dir(): + parent = str(child.parent) + folders.append({ + 'path': str(child.resolve()), + 'name': child.name, + 'parent': parent, + 'parent_name': Path(parent).name if parent else '', + }) + except Exception: + pass + + provs = app.extensions['scidk'].get('providers') + prov = provs.get(provider_id) if provs else None + root_label = None + try: + if prov: + root_label = Path(root_id).name or str(root_id) + except Exception: + root_label = None + + host_type = provider_id + host_id = None + try: + if provider_id == 'rclone': + host_id = f"rclone:{(root_id or '').rstrip(':')}" + elif provider_id == 'local_fs': + import socket as _sock + host_id = f"local:{_sock.gethostname()}" + elif provider_id == 'mounted_fs': + host_id = f"mounted:{root_id}" + except Exception: + host_id = f"{provider_id}:{root_id}" if root_id else provider_id + + scan = { + 'id': scan_id, + 'path': str(path), + 'recursive': bool(recursive), + 'started': started, + 'ended': ended, + 'duration_sec': duration, + 'file_count': int(count), + 'folder_count': len(folders), + 'checksums': new_checksums, + 'folders': folders, + 'by_ext': by_ext, + 'source': getattr(fs, 'last_scan_source', 'python') if provider_id in ('local_fs','mounted_fs') else f"provider:{provider_id}", + 'errors': [], + 'committed': False, + 'committed_at': None, + 'provider_id': provider_id, + 'host_type': host_type, + 'host_id': host_id, + 'root_id': root_id, + 'root_label': root_label, + 'scan_source': f"provider:{provider_id}", + 'ingested_rows': int(ingested), + } + scans = app.extensions['scidk'].setdefault('scans', {}) + scans[scan_id] = scan + try: + app.extensions['scidk'].setdefault('scan_fs', {}).pop(scan_id, None) + except Exception: + pass + telem = app.extensions['scidk'].setdefault('telemetry', {}) + telem['last_scan'] = { + 'path': str(path), + 'recursive': bool(recursive), + 'scanned': int(count), + 'started': started, + 'ended': ended, + 'duration_sec': duration, + 'source': getattr(fs, 'last_scan_source', 'python') if provider_id in ('local_fs','mounted_fs') else f"provider:{provider_id}", + 'provider_id': provider_id, + 'root_id': root_id, + } + dirs = app.extensions['scidk'].setdefault('directories', {}) + drec = dirs.setdefault(str(path), { + 'path': str(path), + 'recursive': bool(recursive), + 'scanned': 0, + 'last_scanned': 0, + 'scan_ids': [], + 'source': getattr(fs, 'last_scan_source', 'python') if provider_id in ('local_fs','mounted_fs') else f"provider:{provider_id}", + 'provider_id': provider_id, + 'root_id': root_id, + 'root_label': root_label, + }) + drec.update({ + 'recursive': bool(recursive), + 'scanned': int(count), + 'last_scanned': ended, + 'source': getattr(fs, 'last_scan_source', 'python') if provider_id in ('local_fs','mounted_fs') else f"provider:{provider_id}", + 'provider_id': provider_id, + 'root_id': root_id, + 'root_label': root_label, + }) + drec.setdefault('scan_ids', []).append(scan_id) + # return payload identical to previous endpoint + return {"status": "ok", "scan_id": scan_id, "scanned": count, "folder_count": len(folders), "ingested_rows": int(ingested), "duration_sec": duration, "path": str(path), "recursive": bool(recursive), "provider_id": provider_id}