From 4e9428256c5303eec85b288a721dd466f3e88aab Mon Sep 17 00:00:00 2001 From: sponge225 <1670519171@qq.com> Date: Tue, 7 Apr 2026 19:38:35 +0800 Subject: [PATCH 1/5] feat(ovpack): recursively enqueue directory vectorization --- openviking/storage/local_fs.py | 65 +++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index 483eac004..c7a5497e6 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -1,5 +1,6 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: AGPL-3.0 +import asyncio import json import os import re @@ -89,38 +90,52 @@ def get_viking_rel_path_from_zip(zip_path: str) -> str: return "/".join(new_parts) -# TODO: Consider recursive vectorization async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext) -> None: queue_manager = get_queue_manager() embedding_queue = cast( EmbeddingQueue, queue_manager.get_queue(queue_manager.EMBEDDING, allow_create=True) ) - parent_uri = VikingURI(uri).parent.uri - abstract = await viking_fs.abstract(uri, ctx=ctx) - resource = Context( - uri=uri, - parent_uri=parent_uri, - is_leaf=False, - abstract=abstract, - level=0, - created_at=datetime.now(), - active_count=0, - related_uri=[], - user=ctx.user, - account_id=ctx.account_id, - owner_space=( - ctx.user.agent_space_name() - if uri.startswith("viking://agent/") - else ctx.user.user_space_name() - if uri.startswith("viking://user/") or uri.startswith("viking://session/") - else "" - ), - meta={"semantic_name": uri.split("/")[-1]}, - ) + entries = await viking_fs.tree(uri, output="original", node_limit=65536, level_limit=1000, ctx=ctx) + dir_uris = {uri} + for entry in entries: + if entry.get("isDir") and entry.get("uri"): + dir_uris.add(entry["uri"]) + + sem = asyncio.Semaphore(16) + + async def process_one(target_uri: str) -> None: + async with sem: + try: + abstract = await viking_fs.abstract(target_uri, ctx=ctx) + except Exception: + return + parent_uri = VikingURI(target_uri).parent.uri + resource = Context( + uri=target_uri, + parent_uri=parent_uri, + is_leaf=False, + abstract=abstract, + level=0, + created_at=datetime.now(), + active_count=0, + related_uri=[], + user=ctx.user, + account_id=ctx.account_id, + owner_space=( + ctx.user.agent_space_name() + if target_uri.startswith("viking://agent/") + else ctx.user.user_space_name() + if target_uri.startswith("viking://user/") or target_uri.startswith("viking://session/") + else "" + ), + meta={"semantic_name": target_uri.split("/")[-1]}, + ) + embedding_msg = EmbeddingMsgConverter.from_context(resource) + if embedding_msg: + await embedding_queue.enqueue(embedding_msg) - embedding_msg = EmbeddingMsgConverter.from_context(resource) - await embedding_queue.enqueue(embedding_msg) + await asyncio.gather(*(process_one(d) for d in dir_uris)) async def import_ovpack( From 7585a4a842487f8e2346aede95d3cb663f0e9986 Mon Sep 17 00:00:00 2001 From: sponge225 <1670519171@qq.com> Date: Wed, 8 Apr 2026 11:48:00 +0800 Subject: [PATCH 2/5] feat(ovpack): recursively vectorize all imported docs --- openviking/storage/local_fs.py | 91 +++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 35 deletions(-) diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index c7a5497e6..3bf704b5c 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -12,6 +12,7 @@ from openviking.server.identity import RequestContext from openviking.storage.queuefs import EmbeddingQueue, get_queue_manager from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter +from openviking.utils.embedding_utils import vectorize_directory_meta, vectorize_file from openviking_cli.exceptions import NotFoundError from openviking_cli.utils.logger import get_logger from openviking_cli.utils.uri import VikingURI @@ -91,51 +92,71 @@ def get_viking_rel_path_from_zip(zip_path: str) -> str: async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext) -> None: - queue_manager = get_queue_manager() - embedding_queue = cast( - EmbeddingQueue, queue_manager.get_queue(queue_manager.EMBEDDING, allow_create=True) - ) - - entries = await viking_fs.tree(uri, output="original", node_limit=65536, level_limit=1000, ctx=ctx) + entries = await viking_fs.tree(uri, output="original", node_limit=100000, level_limit=1000, ctx=ctx) dir_uris = {uri} + file_entries: list[tuple[str, str, str]] = [] for entry in entries: - if entry.get("isDir") and entry.get("uri"): - dir_uris.add(entry["uri"]) + entry_uri = entry.get("uri") + if not entry_uri: + continue + if entry.get("isDir"): + dir_uris.add(entry_uri) + continue + name = entry.get("name", "") + if name.startswith("."): + continue + parent_uri = VikingURI(entry_uri).parent.uri + file_entries.append((entry_uri, parent_uri, name)) sem = asyncio.Semaphore(16) - async def process_one(target_uri: str) -> None: + async def index_dir(dir_uri: str) -> None: async with sem: + abstract_uri = f"{dir_uri}/.abstract.md" + overview_uri = f"{dir_uri}/.overview.md" + abstract = "" + overview = "" try: - abstract = await viking_fs.abstract(target_uri, ctx=ctx) + if await viking_fs.exists(abstract_uri, ctx=ctx): + content = await viking_fs.read_file(abstract_uri, ctx=ctx) + abstract = content.decode("utf-8") if isinstance(content, bytes) else content + if await viking_fs.exists(overview_uri, ctx=ctx): + content = await viking_fs.read_file(overview_uri, ctx=ctx) + overview = content.decode("utf-8") if isinstance(content, bytes) else content except Exception: return - parent_uri = VikingURI(target_uri).parent.uri - resource = Context( - uri=target_uri, - parent_uri=parent_uri, - is_leaf=False, - abstract=abstract, - level=0, - created_at=datetime.now(), - active_count=0, - related_uri=[], - user=ctx.user, - account_id=ctx.account_id, - owner_space=( - ctx.user.agent_space_name() - if target_uri.startswith("viking://agent/") - else ctx.user.user_space_name() - if target_uri.startswith("viking://user/") or target_uri.startswith("viking://session/") - else "" - ), - meta={"semantic_name": target_uri.split("/")[-1]}, - ) - embedding_msg = EmbeddingMsgConverter.from_context(resource) - if embedding_msg: - await embedding_queue.enqueue(embedding_msg) + await vectorize_directory_meta(dir_uri, abstract, overview, ctx=ctx) + + async def index_file(file_uri: str, parent_uri: str, name: str) -> None: + async with sem: + await vectorize_file(file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx) - await asyncio.gather(*(process_one(d) for d in dir_uris)) + work_queue: asyncio.Queue[tuple[str, tuple]] = asyncio.Queue() + for dir_uri in dir_uris: + work_queue.put_nowait(("dir", (dir_uri,))) + for file_uri, parent_uri, file_name in file_entries: + work_queue.put_nowait(("file", (file_uri, parent_uri, file_name))) + + worker_count = 10 + for _ in range(worker_count): + work_queue.put_nowait(("stop", ())) + + async def worker() -> None: + while True: + kind, payload = await work_queue.get() + try: + if kind == "stop": + return + if kind == "dir": + (dir_uri,) = payload + await index_dir(dir_uri) + elif kind == "file": + file_uri, parent_uri, file_name = payload + await index_file(file_uri, parent_uri, file_name) + finally: + work_queue.task_done() + + await asyncio.gather(*(worker() for _ in range(worker_count))) async def import_ovpack( From 3e672e99b9800f462fe92c0b6d4b1ed4ec4d3aec Mon Sep 17 00:00:00 2001 From: sponge225 <1670519171@qq.com> Date: Wed, 8 Apr 2026 11:57:01 +0800 Subject: [PATCH 3/5] refactor(ovpack): use fixed workers for direct vectorization --- openviking/storage/local_fs.py | 36 ++++++++++++++++------------------ 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index 3bf704b5c..caa85ca57 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -108,28 +108,26 @@ async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext parent_uri = VikingURI(entry_uri).parent.uri file_entries.append((entry_uri, parent_uri, name)) - sem = asyncio.Semaphore(16) - async def index_dir(dir_uri: str) -> None: - async with sem: - abstract_uri = f"{dir_uri}/.abstract.md" - overview_uri = f"{dir_uri}/.overview.md" - abstract = "" - overview = "" - try: - if await viking_fs.exists(abstract_uri, ctx=ctx): - content = await viking_fs.read_file(abstract_uri, ctx=ctx) - abstract = content.decode("utf-8") if isinstance(content, bytes) else content - if await viking_fs.exists(overview_uri, ctx=ctx): - content = await viking_fs.read_file(overview_uri, ctx=ctx) - overview = content.decode("utf-8") if isinstance(content, bytes) else content - except Exception: - return - await vectorize_directory_meta(dir_uri, abstract, overview, ctx=ctx) + abstract_uri = f"{dir_uri}/.abstract.md" + overview_uri = f"{dir_uri}/.overview.md" + abstract = "" + overview = "" + try: + if await viking_fs.exists(abstract_uri, ctx=ctx): + content = await viking_fs.read_file(abstract_uri, ctx=ctx) + abstract = content.decode("utf-8") if isinstance(content, bytes) else content + if await viking_fs.exists(overview_uri, ctx=ctx): + content = await viking_fs.read_file(overview_uri, ctx=ctx) + overview = content.decode("utf-8") if isinstance(content, bytes) else content + except Exception: + return + await vectorize_directory_meta(dir_uri, abstract, overview, ctx=ctx) async def index_file(file_uri: str, parent_uri: str, name: str) -> None: - async with sem: - await vectorize_file(file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx) + await vectorize_file( + file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx + ) work_queue: asyncio.Queue[tuple[str, tuple]] = asyncio.Queue() for dir_uri in dir_uris: From 38787897ee5560c26e37281e5d6a6075a7afa553 Mon Sep 17 00:00:00 2001 From: sponge225 <1670519171@qq.com> Date: Wed, 8 Apr 2026 16:01:31 +0800 Subject: [PATCH 4/5] refactor(ovpack): revert direct vectorization to gather --- openviking/storage/local_fs.py | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index caa85ca57..f169691c1 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -129,32 +129,13 @@ async def index_file(file_uri: str, parent_uri: str, name: str) -> None: file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx ) - work_queue: asyncio.Queue[tuple[str, tuple]] = asyncio.Queue() - for dir_uri in dir_uris: - work_queue.put_nowait(("dir", (dir_uri,))) - for file_uri, parent_uri, file_name in file_entries: - work_queue.put_nowait(("file", (file_uri, parent_uri, file_name))) - - worker_count = 10 - for _ in range(worker_count): - work_queue.put_nowait(("stop", ())) - - async def worker() -> None: - while True: - kind, payload = await work_queue.get() - try: - if kind == "stop": - return - if kind == "dir": - (dir_uri,) = payload - await index_dir(dir_uri) - elif kind == "file": - file_uri, parent_uri, file_name = payload - await index_file(file_uri, parent_uri, file_name) - finally: - work_queue.task_done() - - await asyncio.gather(*(worker() for _ in range(worker_count))) + await asyncio.gather(*(index_dir(dir_uri) for dir_uri in dir_uris)) + await asyncio.gather( + *( + index_file(file_uri, parent_uri, file_name) + for file_uri, parent_uri, file_name in file_entries + ) + ) async def import_ovpack( From fe099f8e0ab9f54c9dc4a686ea0e5455e15c4022 Mon Sep 17 00:00:00 2001 From: sponge225 <1670519171@qq.com> Date: Wed, 8 Apr 2026 16:34:00 +0800 Subject: [PATCH 5/5] chore: drop redundant default tree args --- openviking/storage/local_fs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openviking/storage/local_fs.py b/openviking/storage/local_fs.py index f169691c1..cd2dd8fdc 100644 --- a/openviking/storage/local_fs.py +++ b/openviking/storage/local_fs.py @@ -92,7 +92,7 @@ def get_viking_rel_path_from_zip(zip_path: str) -> str: async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext) -> None: - entries = await viking_fs.tree(uri, output="original", node_limit=100000, level_limit=1000, ctx=ctx) + entries = await viking_fs.tree(uri, node_limit=100000, level_limit=1000, ctx=ctx) dir_uris = {uri} file_entries: list[tuple[str, str, str]] = [] for entry in entries: