diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index 483eac004..cd2dd8fdc 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -1,5 +1,6 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: AGPL-3.0 +import asyncio import json import os import re @@ -11,6 +12,7 @@ from openviking.server.identity import RequestContext from openviking.storage.queuefs import EmbeddingQueue, get_queue_manager from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter +from openviking.utils.embedding_utils import vectorize_directory_meta, vectorize_file from openviking_cli.exceptions import NotFoundError from openviking_cli.utils.logger import get_logger from openviking_cli.utils.uri import VikingURI @@ -89,39 +91,52 @@ def get_viking_rel_path_from_zip(zip_path: str) -> str: return "/".join(new_parts) -# TODO: Consider recursive vectorization async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext) -> None: - queue_manager = get_queue_manager() - embedding_queue = cast( - EmbeddingQueue, queue_manager.get_queue(queue_manager.EMBEDDING, allow_create=True) - ) + entries = await viking_fs.tree(uri, node_limit=100000, level_limit=1000, ctx=ctx) + dir_uris = {uri} + file_entries: list[tuple[str, str, str]] = [] + for entry in entries: + entry_uri = entry.get("uri") + if not entry_uri: + continue + if entry.get("isDir"): + dir_uris.add(entry_uri) + continue + name = entry.get("name", "") + if name.startswith("."): + continue + parent_uri = VikingURI(entry_uri).parent.uri + file_entries.append((entry_uri, parent_uri, name)) + + async def index_dir(dir_uri: str) -> None: + abstract_uri = f"{dir_uri}/.abstract.md" + overview_uri = f"{dir_uri}/.overview.md" + abstract = "" + overview = "" + try: + if await viking_fs.exists(abstract_uri, ctx=ctx): + content = await viking_fs.read_file(abstract_uri, ctx=ctx) + abstract = content.decode("utf-8") if isinstance(content, bytes) else content + if await viking_fs.exists(overview_uri, ctx=ctx): + content = await viking_fs.read_file(overview_uri, ctx=ctx) + overview = content.decode("utf-8") if isinstance(content, bytes) else content + except Exception: + return + await vectorize_directory_meta(dir_uri, abstract, overview, ctx=ctx) + + async def index_file(file_uri: str, parent_uri: str, name: str) -> None: + await vectorize_file( + file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx + ) - parent_uri = VikingURI(uri).parent.uri - abstract = await viking_fs.abstract(uri, ctx=ctx) - resource = Context( - uri=uri, - parent_uri=parent_uri, - is_leaf=False, - abstract=abstract, - level=0, - created_at=datetime.now(), - active_count=0, - related_uri=[], - user=ctx.user, - account_id=ctx.account_id, - owner_space=( - ctx.user.agent_space_name() - if uri.startswith("viking://agent/") - else ctx.user.user_space_name() - if uri.startswith("viking://user/") or uri.startswith("viking://session/") - else "" - ), - meta={"semantic_name": uri.split("/")[-1]}, + await asyncio.gather(*(index_dir(dir_uri) for dir_uri in dir_uris)) + await asyncio.gather( + *( + index_file(file_uri, parent_uri, file_name) + for file_uri, parent_uri, file_name in file_entries + ) ) - embedding_msg = EmbeddingMsgConverter.from_context(resource) - await embedding_queue.enqueue(embedding_msg) - async def import_ovpack( viking_fs,