Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 128 additions & 149 deletions scidk/core/path_index_sqlite.py
Original file line number Diff line number Diff line change
@@ -1,166 +1,145 @@
from __future__ import annotations
import os
import sqlite3
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

# Minimal SQLite DAO for path index
# Schema from task-sqlite-path-index:
# files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json)


def _db_path() -> Path:
# Allow override via env; default to ~/.scidk/db/files.db
base = os.environ.get('SCIDK_DB_PATH')
if base:
p = Path(base)
else:
p = Path.home() / '.scidk' / 'db' / 'files.db'
p.parent.mkdir(parents=True, exist_ok=True)
return p
from typing import Dict, Iterable, List, Tuple

DEFAULT_DB = os.path.expanduser("~/.scidk/db/files.db")

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT NOT NULL,
parent_path TEXT,
name TEXT,
depth INTEGER,
type TEXT, -- 'file' | 'folder'
size INTEGER,
modified_time REAL,
file_extension TEXT,
mime_type TEXT,
etag TEXT,
hash TEXT,
remote TEXT,
scan_id TEXT,
extra_json TEXT
);
-- Composite-friendly indexes to speed common queries
CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name);
CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension);
CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type);
"""


def _db_path() -> str:
return os.environ.get("SCIDK_DB_PATH", DEFAULT_DB)


def connect() -> sqlite3.Connection:
p = _db_path()
conn = sqlite3.connect(str(p))
# WAL mode as per acceptance
try:
conn.execute('PRAGMA journal_mode=WAL;')
except Exception:
pass
"""Create a connection to the configured SQLite DB and enable WAL."""
path = _db_path()
d = os.path.dirname(path)
if d and not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
conn = sqlite3.connect(path)
# Force WAL; tests expect exact 'wal'
conn.execute("PRAGMA journal_mode=WAL;")
return conn


def init_db(conn: Optional[sqlite3.Connection] = None):
own = False
if conn is None:
conn = connect()
own = True
try:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS files (
path TEXT NOT NULL,
parent_path TEXT,
name TEXT NOT NULL,
depth INTEGER NOT NULL,
type TEXT NOT NULL,
size INTEGER NOT NULL,
modified_time REAL,
file_extension TEXT,
mime_type TEXT,
etag TEXT,
hash TEXT,
remote TEXT,
scan_id TEXT,
extra_json TEXT
);
"""
)
# Indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_parent_name ON files(scan_id, parent_path, name);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_ext ON files(scan_id, file_extension);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_files_scan_type ON files(scan_id, type);")
conn.commit()
finally:
if own:
conn.close()


def _parent_of(path: str) -> str:
try:
p = Path(path)
return str(p.parent)
except Exception:
return ''
def init_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA_SQL)
conn.commit()


def _name_of(path: str) -> str:
try:
p = Path(path)
return p.name or str(p)
except Exception:
return path
def _ensure_db() -> sqlite3.Connection:
conn = connect()
init_db(conn)
return conn


def _depth_of(path: str) -> int:
try:
# Count separators; for rclone remotes like "remote:folder/sub", keep colon as root and split on '/'
# e.g., "remote:folder/sub" -> depth 2 (folder, sub)
if ':' in path:
suffix = path.split(':', 1)[1]
if suffix.startswith('/'):
suffix = suffix[1:]
return 0 if not suffix else suffix.count('/') + 1
# local style
return 0 if path in ('', '/', None) else str(Path(path)).strip('/').count('/') + 1
except Exception:
def map_rclone_item_to_row(item: Dict, root_path: str, scan_id: str) -> Dict:
"""Map a single rclone lsjson item to the path-index row dict.
Expected item fields include: Name, Path, Size, IsDir, MimeType...
root_path is the scan root target (e.g., "remote:bucket" or "remote:")
"""
name = (item.get("Name") or item.get("Path") or "").strip()
is_dir = bool(item.get("IsDir"))
size = int(item.get("Size") or 0)
# Build full path by joining root and Name (or Path) with '/' unless root endswith ':'
base = root_path if root_path else ""
if base.endswith(":"):
full_path = f"{base}{name}" if name else base
else:
if name:
full_path = f"{base.rstrip('/')}/{name}"
else:
full_path = base.rstrip('/')
p = Path(full_path)
parent = str(p.parent) if str(p) else ""
depth = len([x for x in p.parts if x not in ("", "/")])
ext = "" if is_dir else p.suffix.lower()
row = {
"path": str(p),
"parent_path": parent,
"name": p.name or str(p),
"depth": depth,
"type": "folder" if is_dir else "file",
"size": 0 if is_dir else size,
"modified": 0.0,
"ext": ext,
"scan_id": scan_id,
# provider/host metadata can be filled by caller in future; keep None for now
"provider_id": "rclone",
"host_type": "rclone",
"host_id": None,
"root_id": None,
"root_label": None,
}
return row




def batch_insert_files(rows: List[Tuple], batch_size: int = 10000) -> int:
"""Insert rows into SQLite in batches. Returns inserted row count.
Creates the DB and schema on first use.
The row tuple order must match the schema fields used by tests:
(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json)
"""
if not rows:
return 0


def map_rclone_item_to_row(item: Dict, target_root: str, scan_id: str) -> Tuple:
# rclone lsjson fields: Name, Path, Size, MimeType, ModTime, IsDir
name = (item.get('Name') or item.get('Path') or '')
is_dir = bool(item.get('IsDir'))
size = int(item.get('Size') or 0)
mime = item.get('MimeType')
# Full path under target root
base = target_root if target_root.endswith(':') else target_root.rstrip('/')
full = f"{base}/{name}" if not base.endswith(':') else f"{base}{name}"
parent = _parent_of(full)
depth = _depth_of(full)
ext = '' if is_dir else Path(name).suffix.lower()
mtime = None
# ModTime may be ISO8601; we can store as text in extra_json or leave None for MVP
# ETag/Hash not available from lsjson by default
etag = None
ahash = None
remote = target_root.split(':', 1)[0] if ':' in target_root else None
type_val = 'folder' if is_dir else 'file'
extra = None
return (
full, # path
parent, # parent_path
name, # name
depth, # depth
type_val, # type
size, # size
mtime, # modified_time
ext, # file_extension
mime, # mime_type
etag, # etag
ahash, # hash
remote, # remote
scan_id, # scan_id
extra, # extra_json
conn = _ensure_db()
inserted = 0
cols = (
"path,parent_path,name,depth,type,size,modified_time,file_extension,mime_type,etag,hash,remote,scan_id,extra_json"
)


def batch_insert_files(rows: Iterable[Tuple], batch_size: int = 10000) -> int:
"""Insert rows in batches. Returns total inserted."""
conn = connect()
init_db(conn)
total = 0
sql = f"INSERT INTO files ({cols}) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
try:
cur = conn.cursor()
buf: List[Tuple] = []
for r in rows:
buf.append(r)
if len(buf) >= batch_size:
cur.executemany(
"INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
buf,
)
conn.commit()
total += len(buf)
buf.clear()
if buf:
cur.executemany(
"INSERT INTO files(path, parent_path, name, depth, type, size, modified_time, file_extension, mime_type, etag, hash, remote, scan_id, extra_json) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
buf,
)
conn.commit()
total += len(buf)
return total
with conn:
batch: List[Tuple] = []
for r in rows:
if not isinstance(r, tuple):
# tolerate dict style rows minimally by mapping known keys
try:
r = (
r.get('path'), r.get('parent_path'), r.get('name'), r.get('depth'), r.get('type'), r.get('size'),
r.get('modified_time') or r.get('modified') or 0.0, r.get('file_extension') or r.get('ext'),
r.get('mime_type'), r.get('etag'), r.get('hash'), r.get('remote'), r.get('scan_id'), r.get('extra_json')
)
except Exception:
continue
batch.append(r)
if len(batch) >= batch_size:
conn.executemany(sql, batch)
inserted += len(batch)
batch = []
if batch:
conn.executemany(sql, batch)
inserted += len(batch)
finally:
conn.close()
try:
conn.close()
except Exception:
pass
return inserted