Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions openviking/storage/local_fs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
import asyncio
import json
import os
import re
Expand All @@ -11,6 +12,7 @@
from openviking.server.identity import RequestContext
from openviking.storage.queuefs import EmbeddingQueue, get_queue_manager
from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter
from openviking.utils.embedding_utils import vectorize_directory_meta, vectorize_file
from openviking_cli.exceptions import NotFoundError
from openviking_cli.utils.logger import get_logger
from openviking_cli.utils.uri import VikingURI
Expand Down Expand Up @@ -89,39 +91,52 @@ def get_viking_rel_path_from_zip(zip_path: str) -> str:
return "/".join(new_parts)


# TODO: Consider recursive vectorization
async def _enqueue_direct_vectorization(viking_fs, uri: str, ctx: RequestContext) -> None:
queue_manager = get_queue_manager()
embedding_queue = cast(
EmbeddingQueue, queue_manager.get_queue(queue_manager.EMBEDDING, allow_create=True)
)
entries = await viking_fs.tree(uri, node_limit=100000, level_limit=1000, ctx=ctx)
dir_uris = {uri}
file_entries: list[tuple[str, str, str]] = []
for entry in entries:
entry_uri = entry.get("uri")
if not entry_uri:
continue
if entry.get("isDir"):
dir_uris.add(entry_uri)
continue
name = entry.get("name", "")
if name.startswith("."):
continue
parent_uri = VikingURI(entry_uri).parent.uri
file_entries.append((entry_uri, parent_uri, name))

async def index_dir(dir_uri: str) -> None:
abstract_uri = f"{dir_uri}/.abstract.md"
overview_uri = f"{dir_uri}/.overview.md"
abstract = ""
overview = ""
try:
if await viking_fs.exists(abstract_uri, ctx=ctx):
content = await viking_fs.read_file(abstract_uri, ctx=ctx)
abstract = content.decode("utf-8") if isinstance(content, bytes) else content
if await viking_fs.exists(overview_uri, ctx=ctx):
content = await viking_fs.read_file(overview_uri, ctx=ctx)
overview = content.decode("utf-8") if isinstance(content, bytes) else content
except Exception:
return
await vectorize_directory_meta(dir_uri, abstract, overview, ctx=ctx)

async def index_file(file_uri: str, parent_uri: str, name: str) -> None:
await vectorize_file(
file_path=file_uri, summary_dict={"name": name}, parent_uri=parent_uri, ctx=ctx
)

parent_uri = VikingURI(uri).parent.uri
abstract = await viking_fs.abstract(uri, ctx=ctx)
resource = Context(
uri=uri,
parent_uri=parent_uri,
is_leaf=False,
abstract=abstract,
level=0,
created_at=datetime.now(),
active_count=0,
related_uri=[],
user=ctx.user,
account_id=ctx.account_id,
owner_space=(
ctx.user.agent_space_name()
if uri.startswith("viking://agent/")
else ctx.user.user_space_name()
if uri.startswith("viking://user/") or uri.startswith("viking://session/")
else ""
),
meta={"semantic_name": uri.split("/")[-1]},
await asyncio.gather(*(index_dir(dir_uri) for dir_uri in dir_uris))
await asyncio.gather(
*(
index_file(file_uri, parent_uri, file_name)
for file_uri, parent_uri, file_name in file_entries
)
)

embedding_msg = EmbeddingMsgConverter.from_context(resource)
await embedding_queue.enqueue(embedding_msg)


async def import_ovpack(
viking_fs,
Expand Down
Loading