diff --git a/server/routers/knowledge_router.py b/server/routers/knowledge_router.py
index 06e93dcc7..ee38257a5 100644
--- a/server/routers/knowledge_router.py
+++ b/server/routers/knowledge_router.py
@@ -14,7 +14,7 @@
from server.services.tasker import TaskContext, tasker
from src import config, knowledge_base
from src.knowledge.indexing import SUPPORTED_FILE_EXTENSIONS, is_supported_file_extension, process_file_to_markdown
-from src.knowledge.utils import calculate_content_hash
+from src.knowledge.utils import calculate_content_hash, merge_processing_params
from src.models.embed import test_embedding_model_status, test_all_embedding_models_status
from src.utils import hashstr, logger
@@ -361,6 +361,108 @@ async def delete_document(db_id: str, doc_id: str, current_user: User = Depends(
raise HTTPException(status_code=400, detail=f"删除文档失败: {e}")
+@knowledge.post("/databases/{db_id}/documents/rechunks")
+async def rechunks_documents(
+ db_id: str, file_ids: list[str] = Body(...), params: dict = Body(...), current_user: User = Depends(get_admin_user)
+):
+ """重新分块文档"""
+ logger.debug(f"Rechunks documents for db_id {db_id}: {file_ids} {params=}")
+
+ async def run_rechunks(context: TaskContext):
+ await context.set_message("任务初始化")
+ await context.set_progress(5.0, "准备重新分块文档")
+
+ total = len(file_ids)
+ processed_items = []
+
+ try:
+ # 逐个处理文档并更新进度
+ for idx, file_id in enumerate(file_ids, 1):
+ await context.raise_if_cancelled()
+
+ # 更新进度
+ progress = 5.0 + (idx / total) * 90.0 # 5% ~ 95%
+ await context.set_progress(progress, f"正在重新分块第 {idx}/{total} 个文档")
+
+ # 获取文档元数据中的处理参数
+ metadata_params = None
+ try:
+ file_info = await knowledge_base.get_file_basic_info(db_id, file_id)
+ metadata_params = file_info.get("meta", {}).get("processing_params")
+ except Exception as meta_error:
+ logger.warning(f"Failed to get metadata for file {file_id}: {meta_error}")
+
+ # 合并参数:优先使用请求参数,缺失时使用元数据参数
+ merged_params = merge_processing_params(metadata_params, params)
+
+ # 处理单个文档
+ try:
+ result = await knowledge_base.update_content(db_id, [file_id], params=merged_params)
+ processed_items.extend(result)
+ except Exception as doc_error:
+ # 处理单个文档处理的所有异常(包括超时)
+ logger.error(f"Document rechunking failed for {file_id}: {doc_error}")
+
+ # 判断是否是超时异常
+ error_type = "timeout" if isinstance(doc_error, TimeoutError) else "processing_error"
+ error_msg = "处理超时" if isinstance(doc_error, TimeoutError) else "处理失败"
+
+ processed_items.append({
+ "file_id": file_id,
+ "status": "failed",
+ "error": f"{error_msg}: {str(doc_error)}",
+ "error_type": error_type
+ })
+
+ except asyncio.CancelledError:
+ await context.set_progress(100.0, "任务已取消")
+ raise
+ except Exception as task_error:
+ # 处理整体任务的其他异常(如内存不足、网络错误等)
+ logger.exception(f"Task rechunking failed: {task_error}")
+ await context.set_progress(100.0, f"任务处理失败: {str(task_error)}")
+ # 将所有未处理的文档标记为失败
+ for file_id in file_ids[len(processed_items):]:
+ processed_items.append({
+ "file_id": file_id,
+ "status": "failed",
+ "error": f"任务失败: {str(task_error)}",
+ "error_type": "task_failed"
+ })
+ raise
+
+ failed_count = len([_p for _p in processed_items if _p.get("status") == "failed"])
+ summary = {
+ "db_id": db_id,
+ "submitted": len(processed_items),
+ "failed": failed_count,
+ }
+ message = f"文档重新分块完成,失败 {failed_count} 个" if failed_count else "文档重新分块完成"
+ await context.set_result(summary | {"items": processed_items})
+ await context.set_progress(100.0, message)
+ return summary | {"items": processed_items}
+
+ try:
+ task = await tasker.enqueue(
+ name=f"文档重新分块({db_id})",
+ task_type="knowledge_rechunks",
+ payload={
+ "db_id": db_id,
+ "file_ids": file_ids,
+ "params": params,
+ },
+ coroutine=run_rechunks,
+ )
+ return {
+ "message": "任务已提交,请在任务中心查看进度",
+ "status": "queued",
+ "task_id": task.id,
+ }
+ except Exception as e: # noqa: BLE001
+ logger.error(f"Failed to enqueue rechunks task: {e}, {traceback.format_exc()}")
+ return {"message": f"Failed to enqueue task: {e}", "status": "failed"}
+
+
@knowledge.get("/databases/{db_id}/documents/{doc_id}/download")
async def download_document(db_id: str, doc_id: str, request: Request, current_user: User = Depends(get_admin_user)):
"""下载原始文件"""
diff --git a/src/knowledge/base.py b/src/knowledge/base.py
index 33c2adcff..b98e0ea18 100644
--- a/src/knowledge/base.py
+++ b/src/knowledge/base.py
@@ -212,6 +212,21 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
"""
pass
+ @abstractmethod
+ async def update_content(self, db_id: str, file_ids: list[str], params: dict | None = None) -> list[dict]:
+ """
+ 更新内容 - 根据file_ids重新解析文件并更新向量库
+
+ Args:
+ db_id: 数据库ID
+ file_ids: 文件ID列表
+ params: 处理参数
+
+ Returns:
+ 更新结果列表
+ """
+ pass
+
@abstractmethod
async def aquery(self, query_text: str, db_id: str, **kwargs) -> list[dict]:
"""
@@ -278,6 +293,7 @@ def get_database_info(self, db_id: str) -> dict | None:
"type": file_info.get("file_type", ""),
"status": file_info.get("status", "done"),
"created_at": created_at,
+ "processing_params": file_info.get("processing_params", None),
}
# 按创建时间倒序排序文件列表
diff --git a/src/knowledge/implementations/chroma.py b/src/knowledge/implementations/chroma.py
index 10359fc83..7350510ea 100644
--- a/src/knowledge/implementations/chroma.py
+++ b/src/knowledge/implementations/chroma.py
@@ -187,7 +187,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None) -
for item in items:
# 准备文件元数据
- metadata = prepare_item_metadata(item, content_type, db_id)
+ metadata = prepare_item_metadata(item, content_type, db_id, params=params)
file_id = metadata["file_id"]
filename = metadata["filename"]
@@ -252,6 +252,112 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None) -
return processed_items_info
+ async def update_content(self, db_id: str, file_ids: list[str], params: dict | None = None) -> list[dict]:
+ """更新内容 - 根据file_ids重新解析文件并更新向量库"""
+ if db_id not in self.databases_meta:
+ raise ValueError(f"Database {db_id} not found")
+
+ collection = await self._get_chroma_collection(db_id)
+ if not collection:
+ raise ValueError(f"Failed to get ChromaDB collection for {db_id}")
+
+ # 处理默认参数
+ if params is None:
+ params = {}
+ content_type = params.get("content_type", "file")
+ processed_items_info = []
+
+ for file_id in file_ids:
+ # 从元数据中获取文件信息
+ if file_id not in self.files_meta:
+ logger.warning(f"File {file_id} not found in metadata, skipping")
+ continue
+
+ file_meta = self.files_meta[file_id]
+ file_path = file_meta.get("path")
+ filename = file_meta.get("filename")
+
+ if not file_path:
+ logger.warning(f"File path not found for {file_id}, skipping")
+ continue
+
+ # 添加到处理队列
+ self._add_to_processing_queue(file_id)
+
+ try:
+ # 更新状态为处理中
+ self.files_meta[file_id]["processing_params"] = params.copy()
+ self.files_meta[file_id]["status"] = "processing"
+ self._save_metadata()
+
+ # 重新解析文件为 markdown
+ if content_type == "file":
+ markdown_content = await process_file_to_markdown(file_path, params=params)
+ else:
+ markdown_content = await process_url_to_markdown(file_path, params=params)
+
+ # 先删除现有的 ChromaDB 数据(仅删除chunks,保留元数据)
+ await self.delete_file_chunks_only(db_id, file_id)
+
+ # 重新生成 chunks
+ chunks = self._split_text_into_chunks(markdown_content, file_id, filename, params)
+ logger.info(f"Split {filename} into {len(chunks)} chunks")
+
+ if chunks:
+ documents = [chunk["content"] for chunk in chunks]
+ metadatas = [chunk["metadata"] for chunk in chunks]
+ ids = [chunk["id"] for chunk in chunks]
+
+ # 插入到 ChromaDB - 分批处理以避免超出 OpenAI 批次大小限制
+ batch_size = 64 # OpenAI 的最大批次大小限制
+ total_batches = (len(chunks) + batch_size - 1) // batch_size
+
+ for i in range(0, len(chunks), batch_size):
+ batch_documents = documents[i : i + batch_size]
+ batch_metadatas = metadatas[i : i + batch_size]
+ batch_ids = ids[i : i + batch_size]
+
+ await asyncio.to_thread(
+ collection.add,
+ documents=batch_documents,
+ metadatas=batch_metadatas,
+ ids=batch_ids,
+ )
+
+ batch_num = i // batch_size + 1
+ logger.info(f"Processed batch {batch_num}/{total_batches} for {filename}")
+
+ logger.info(f"Updated {content_type} {file_path} in ChromaDB. Done.")
+
+ # 更新元数据状态
+ self.files_meta[file_id]["status"] = "done"
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回更新后的文件信息
+ updated_file_meta = file_meta.copy()
+ updated_file_meta["status"] = "done"
+ updated_file_meta["file_id"] = file_id
+ processed_items_info.append(updated_file_meta)
+
+ except Exception as e:
+ logger.error(f"更新{content_type} {file_path} 失败: {e}, {traceback.format_exc()}")
+ self.files_meta[file_id]["status"] = "failed"
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回失败的文件信息
+ failed_file_meta = file_meta.copy()
+ failed_file_meta["status"] = "failed"
+ failed_file_meta["file_id"] = file_id
+ processed_items_info.append(failed_file_meta)
+
+ return processed_items_info
+
async def aquery(self, query_text: str, db_id: str, **kwargs) -> list[dict]:
"""异步查询知识库"""
collection = await self._get_chroma_collection(db_id)
@@ -346,8 +452,8 @@ async def aquery(self, query_text: str, db_id: str, **kwargs) -> list[dict]:
logger.error(f"ChromaDB query error: {e}, {traceback.format_exc()}")
return []
- async def delete_file(self, db_id: str, file_id: str) -> None:
- """删除文件"""
+ async def delete_file_chunks_only(self, db_id: str, file_id: str) -> None:
+ """仅删除文件的chunks数据,保留元数据(用于更新操作)"""
collection = await self._get_chroma_collection(db_id)
if collection:
try:
@@ -361,6 +467,12 @@ async def delete_file(self, db_id: str, file_id: str) -> None:
except Exception as e:
logger.error(f"Error deleting file {file_id} from ChromaDB: {e}")
+ # 注意:这里不删除 files_meta[file_id],保留元数据用于后续操作
+
+ async def delete_file(self, db_id: str, file_id: str) -> None:
+ """删除文件(包括元数据)"""
+ # 先删除 ChromaDB 中的 chunks 数据
+ await self.delete_file_chunks_only(db_id, file_id)
# 删除文件记录
if file_id in self.files_meta:
diff --git a/src/knowledge/implementations/lightrag.py b/src/knowledge/implementations/lightrag.py
index 5a5ee54ac..48a45777f 100644
--- a/src/knowledge/implementations/lightrag.py
+++ b/src/knowledge/implementations/lightrag.py
@@ -230,7 +230,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
for item in items:
# 准备文件元数据
- metadata = prepare_item_metadata(item, content_type, db_id)
+ metadata = prepare_item_metadata(item, content_type, db_id, params=params)
file_id = metadata["file_id"]
item_path = metadata["path"]
@@ -274,6 +274,91 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
return processed_items_info
+ async def update_content(self, db_id: str, file_ids: list[str], params: dict | None = None) -> list[dict]:
+ """更新内容 - 根据file_ids重新解析文件并更新向量库"""
+ if db_id not in self.databases_meta:
+ raise ValueError(f"Database {db_id} not found")
+
+ rag = await self._get_lightrag_instance(db_id)
+ if not rag:
+ raise ValueError(f"Failed to get LightRAG instance for {db_id}")
+
+ # 处理默认参数
+ if params is None:
+ params = {}
+ content_type = params.get("content_type", "file")
+ processed_items_info = []
+
+ for file_id in file_ids:
+ # 从元数据中获取文件信息
+ if file_id not in self.files_meta:
+ logger.warning(f"File {file_id} not found in metadata, skipping")
+ continue
+
+ file_meta = self.files_meta[file_id]
+ file_path = file_meta.get("path")
+
+ if not file_path:
+ logger.warning(f"File path not found for {file_id}, skipping")
+ continue
+
+ # 添加到处理队列
+ self._add_to_processing_queue(file_id)
+
+ try:
+ # 更新状态为处理中
+ self.files_meta[file_id]["processing_params"] = params.copy()
+ self.files_meta[file_id]["status"] = "processing"
+ self._save_metadata()
+
+ # 重新解析文件为 markdown
+ if content_type == "file":
+ markdown_content = await process_file_to_markdown(file_path, params=params)
+ markdown_content_lines = markdown_content[:100].replace("\n", " ")
+ logger.info(f"Markdown content: {markdown_content_lines}...")
+ else:
+ markdown_content = await process_url_to_markdown(file_path, params=params)
+
+ # 先删除现有的 LightRAG 数据(仅删除chunks,保留元数据)
+ await self.delete_file_chunks_only(db_id, file_id)
+
+ # 使用 LightRAG 重新插入内容
+ await rag.ainsert(input=markdown_content, ids=file_id, file_paths=file_path)
+
+ logger.info(f"Updated {content_type} {file_path} in LightRAG. Done.")
+
+ # 更新元数据状态
+ self.files_meta[file_id]["status"] = "done"
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回更新后的文件信息
+ updated_file_meta = file_meta.copy()
+ updated_file_meta["status"] = "done"
+ updated_file_meta["file_id"] = file_id
+ processed_items_info.append(updated_file_meta)
+
+ except Exception as e:
+ error_msg = str(e)
+ logger.error(f"更新{content_type} {file_path} 失败: {error_msg}, {traceback.format_exc()}")
+ self.files_meta[file_id]["status"] = "failed"
+ self.files_meta[file_id]["error"] = error_msg
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回失败的文件信息
+ failed_file_meta = file_meta.copy()
+ failed_file_meta["status"] = "failed"
+ failed_file_meta["file_id"] = file_id
+ failed_file_meta["error"] = error_msg
+ processed_items_info.append(failed_file_meta)
+
+ return processed_items_info
+
async def aquery(self, query_text: str, db_id: str, **kwargs) -> str:
"""异步查询知识库"""
rag = await self._get_lightrag_instance(db_id)
@@ -299,15 +384,22 @@ async def aquery(self, query_text: str, db_id: str, **kwargs) -> str:
logger.error(f"Query error: {e}, {traceback.format_exc()}")
return ""
- async def delete_file(self, db_id: str, file_id: str) -> None:
- """删除文件"""
+ async def delete_file_chunks_only(self, db_id: str, file_id: str) -> None:
+ """仅删除文件的chunks数据,保留元数据(用于更新操作)"""
rag = await self._get_lightrag_instance(db_id)
if rag:
try:
# 使用 LightRAG 删除文档
await rag.adelete_by_doc_id(file_id)
+ logger.info(f"Deleted chunks for file {file_id} from LightRAG")
except Exception as e:
logger.error(f"Error deleting file {file_id} from LightRAG: {e}")
+ # 注意:这里不删除 files_meta[file_id],保留元数据用于后续操作
+
+ async def delete_file(self, db_id: str, file_id: str) -> None:
+ """删除文件(包括元数据)"""
+ # 先删除 LightRAG 中的 chunks 数据
+ await self.delete_file_chunks_only(db_id, file_id)
# 删除文件记录
if file_id in self.files_meta:
diff --git a/src/knowledge/implementations/milvus.py b/src/knowledge/implementations/milvus.py
index fc0fed4c0..b734532de 100644
--- a/src/knowledge/implementations/milvus.py
+++ b/src/knowledge/implementations/milvus.py
@@ -226,7 +226,7 @@ async def add_content(self, db_id: str, items: list[str], params: dict | None =
processed_items_info = []
for item in items:
- metadata = prepare_item_metadata(item, content_type, db_id)
+ metadata = prepare_item_metadata(item, content_type, db_id, params=params)
file_id = metadata["file_id"]
filename = metadata["filename"]
@@ -293,6 +293,114 @@ def _insert_records():
return processed_items_info
+ async def update_content(self, db_id: str, file_ids: list[str], params: dict | None = None) -> list[dict]:
+ """更新内容 - 根据file_ids重新解析文件并更新向量库"""
+ if db_id not in self.databases_meta:
+ raise ValueError(f"Database {db_id} not found")
+
+ collection = await self._get_milvus_collection(db_id)
+ if not collection:
+ raise ValueError(f"Failed to get Milvus collection for {db_id}")
+
+ embed_info = self.databases_meta[db_id].get("embed_info", {})
+ embedding_function = self._get_async_embedding_function(embed_info)
+
+ # 处理默认参数
+ if params is None:
+ params = {}
+ content_type = params.get("content_type", "file")
+ processed_items_info = []
+
+ for file_id in file_ids:
+ # 从元数据中获取文件信息
+ async with self._metadata_lock:
+ if file_id not in self.files_meta:
+ logger.warning(f"File {file_id} not found in metadata, skipping")
+ continue
+
+ file_meta = self.files_meta[file_id]
+ file_path = file_meta.get("path")
+ filename = file_meta.get("filename")
+
+ if not file_path:
+ logger.warning(f"File path not found for {file_id}, skipping")
+ continue
+
+ # 添加到处理队列
+ self._add_to_processing_queue(file_id)
+
+ try:
+ # 更新状态为处理中
+ async with self._metadata_lock:
+ self.files_meta[file_id]["processing_params"] = params.copy()
+ self.files_meta[file_id]["status"] = "processing"
+ self._save_metadata()
+
+ # 重新解析文件为 markdown
+ if content_type == "file":
+ markdown_content = await process_file_to_markdown(file_path, params=params)
+ else:
+ markdown_content = await process_url_to_markdown(file_path, params=params)
+
+ # 先删除现有的 Milvus 数据(仅删除chunks,保留元数据)
+ await self.delete_file_chunks_only(db_id, file_id)
+
+ # 重新生成 chunks
+ chunks = self._split_text_into_chunks(markdown_content, file_id, filename, params)
+ logger.info(f"Split {filename} into {len(chunks)} chunks")
+
+ if chunks:
+ texts = [chunk["content"] for chunk in chunks]
+ embeddings = await embedding_function(texts)
+
+ entities = [
+ [chunk["id"] for chunk in chunks],
+ [chunk["content"] for chunk in chunks],
+ [chunk["source"] for chunk in chunks],
+ [chunk["chunk_id"] for chunk in chunks],
+ [chunk["file_id"] for chunk in chunks],
+ [chunk["chunk_index"] for chunk in chunks],
+ embeddings,
+ ]
+
+ def _insert_records():
+ collection.insert(entities)
+
+ await asyncio.to_thread(_insert_records)
+
+ logger.info(f"Updated {content_type} {file_path} in Milvus. Done.")
+
+ # 更新元数据状态
+ async with self._metadata_lock:
+ self.files_meta[file_id]["status"] = "done"
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回更新后的文件信息
+ updated_file_meta = file_meta.copy()
+ updated_file_meta["status"] = "done"
+ updated_file_meta["file_id"] = file_id
+ processed_items_info.append(updated_file_meta)
+
+ except Exception as e:
+ logger.error(f"更新{content_type} {file_path} 失败: {e}, {traceback.format_exc()}")
+ async with self._metadata_lock:
+ self.files_meta[file_id]["status"] = "failed"
+ self._save_metadata()
+
+ # 从处理队列中移除
+ self._remove_from_processing_queue(file_id)
+
+ # 返回失败的文件信息
+ failed_file_meta = file_meta.copy()
+ failed_file_meta["status"] = "failed"
+ failed_file_meta["file_id"] = file_id
+ processed_items_info.append(failed_file_meta)
+
+ return processed_items_info
+
async def aquery(self, query_text: str, db_id: str, **kwargs) -> list[dict]:
"""异步查询知识库"""
collection = await self._get_milvus_collection(db_id)
@@ -392,8 +500,8 @@ async def aquery(self, query_text: str, db_id: str, **kwargs) -> list[dict]:
logger.error(f"Milvus query error: {e}, {traceback.format_exc()}")
return []
- async def delete_file(self, db_id: str, file_id: str) -> None:
- """删除文件"""
+ async def delete_file_chunks_only(self, db_id: str, file_id: str) -> None:
+ """仅删除文件的chunks数据,保留元数据(用于更新操作)"""
collection = await self._get_milvus_collection(db_id)
if collection:
@@ -416,6 +524,13 @@ def _delete_from_milvus():
await asyncio.to_thread(_delete_from_milvus)
except Exception as e:
logger.error(f"Error checking file existence in Milvus: {e}")
+ # 注意:这里不删除 files_meta[file_id],保留元数据用于后续操作
+
+ async def delete_file(self, db_id: str, file_id: str) -> None:
+ """删除文件(包括元数据)"""
+ # 先删除 Milvus 中的 chunks 数据
+ await self.delete_file_chunks_only(db_id, file_id)
+
# 使用锁确保元数据操作的原子性
async with self._metadata_lock:
if file_id in self.files_meta:
diff --git a/src/knowledge/manager.py b/src/knowledge/manager.py
index b2ec2df89..e46c6ca63 100644
--- a/src/knowledge/manager.py
+++ b/src/knowledge/manager.py
@@ -316,6 +316,11 @@ async def delete_file(self, db_id: str, file_id: str) -> None:
kb_instance = self._get_kb_for_database(db_id)
await kb_instance.delete_file(db_id, file_id)
+ async def update_content(self, db_id: str, file_ids: list[str], params: dict | None = None) -> list[dict]:
+ """更新内容(重新分块)"""
+ kb_instance = self._get_kb_for_database(db_id)
+ return await kb_instance.update_content(db_id, file_ids, params or {})
+
async def get_file_basic_info(self, db_id: str, file_id: str) -> dict:
"""获取文件基本信息(仅元数据)"""
kb_instance = self._get_kb_for_database(db_id)
diff --git a/src/knowledge/utils/__init__.py b/src/knowledge/utils/__init__.py
index 9b3fa235a..0588c186c 100644
--- a/src/knowledge/utils/__init__.py
+++ b/src/knowledge/utils/__init__.py
@@ -8,6 +8,7 @@
from .kb_utils import (
calculate_content_hash,
get_embedding_config,
+ merge_processing_params,
prepare_item_metadata,
split_text_into_chunks,
split_text_into_qa_chunks,
diff --git a/src/knowledge/utils/kb_utils.py b/src/knowledge/utils/kb_utils.py
index e14c61066..086807913 100644
--- a/src/knowledge/utils/kb_utils.py
+++ b/src/knowledge/utils/kb_utils.py
@@ -128,9 +128,15 @@ def calculate_content_hash(data: bytes | bytearray | str | os.PathLike[str] | Pa
raise TypeError(f"Unsupported data type for hashing: {type(data)!r}")
-def prepare_item_metadata(item: str, content_type: str, db_id: str) -> dict:
+def prepare_item_metadata(item: str, content_type: str, db_id: str, params: dict | None = None) -> dict:
"""
准备文件或URL的元数据
+
+ Args:
+ item: 文件路径或URL
+ content_type: 内容类型 ("file" 或 "url")
+ db_id: 数据库ID
+ params: 处理参数,可选
"""
if content_type == "file":
file_path = Path(item)
@@ -151,7 +157,7 @@ def prepare_item_metadata(item: str, content_type: str, db_id: str) -> dict:
item_path = item
content_hash = None
- return {
+ metadata = {
"database_id": db_id,
"filename": filename,
"path": item_path,
@@ -162,6 +168,12 @@ def prepare_item_metadata(item: str, content_type: str, db_id: str) -> dict:
"content_hash": content_hash,
}
+ # 保存处理参数到元数据
+ if params:
+ metadata["processing_params"] = params.copy()
+
+ return metadata
+
def split_text_into_qa_chunks(
text: str, file_id: str, filename: str, qa_separator: None | str = None, params: dict = {}
@@ -196,6 +208,31 @@ def split_text_into_qa_chunks(
return chunks
+def merge_processing_params(metadata_params: dict | None, request_params: dict | None) -> dict:
+ """
+ 合并处理参数:优先使用请求参数,缺失时使用元数据中的参数
+
+ Args:
+ metadata_params: 元数据中保存的参数
+ request_params: 请求中提供的参数
+
+ Returns:
+ dict: 合并后的参数
+ """
+ merged_params = {}
+
+ # 首先使用元数据中的参数作为默认值
+ if metadata_params:
+ merged_params.update(metadata_params)
+
+ # 然后使用请求参数覆盖(如果提供)
+ if request_params:
+ merged_params.update(request_params)
+
+ logger.debug(f"Merged processing params: metadata={metadata_params}, request={request_params}, result={merged_params}")
+ return merged_params
+
+
def get_embedding_config(embed_info: dict) -> dict:
"""
获取嵌入模型配置
diff --git a/web/src/apis/knowledge_api.js b/web/src/apis/knowledge_api.js
index 4489201b1..787c5fdc0 100644
--- a/web/src/apis/knowledge_api.js
+++ b/web/src/apis/knowledge_api.js
@@ -103,6 +103,20 @@ export const documentApi = {
*/
downloadDocument: async (dbId, docId) => {
return apiAdminGet(`/api/knowledge/databases/${dbId}/documents/${docId}/download`, {}, 'blob')
+ },
+
+ /**
+ * 重新分块文档
+ * @param {string} dbId - 知识库ID
+ * @param {Array} fileIds - 文件ID列表
+ * @param {Object} params - 处理参数
+ * @returns {Promise} - 重新分块结果
+ */
+ rechunksDocuments: async (dbId, fileIds, params = {}) => {
+ return apiAdminPost(`/api/knowledge/databases/${dbId}/documents/rechunks`, {
+ file_ids: fileIds,
+ params
+ })
}
}
diff --git a/web/src/components/ChunkParamsConfig.vue b/web/src/components/ChunkParamsConfig.vue
new file mode 100644
index 000000000..6b626f2b6
--- /dev/null
+++ b/web/src/components/ChunkParamsConfig.vue
@@ -0,0 +1,68 @@
+
+ 调整分块参数可以控制文本的切分方式,影响检索质量和文档加载效率。 每个文本片段的最大字符数 相邻文本片段间的重叠字符数 启用后将按QA对分割,忽略上述chunk大小设置 用于分割不同QA对的分隔符