Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/ov.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"volcengine": {
"region": "cn-beijing",
"ak": null,
"sk": null
"sk": null,
"session_token": null
}
},
"agfs": {
Expand Down
26 changes: 20 additions & 6 deletions openviking/storage/vectordb/collection/volcengine_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ class ClientForConsoleApi:
"cn-guangzhou": "vikingdb.cn-guangzhou.volcengineapi.com",
}

def __init__(self, ak, sk, region, host=None):
def __init__(self, ak, sk, region, host=None, session_token=None):
self.ak = ak
self.sk = sk
self.region = region
self.host = host if host else ClientForConsoleApi._global_host[region]
self.session_token = session_token or ""

if not all([self.ak, self.sk, self.host, self.region]):
raise ValueError("AK, SK, Host, and Region are required for ClientForConsoleApi")
Expand Down Expand Up @@ -54,7 +55,13 @@ def prepare_request(self, method, params=None, data=None):
if data is not None:
r.set_body(json.dumps(data))

credentials = Credentials(self.ak, self.sk, "vikingdb", self.region)
credentials = Credentials(
self.ak,
self.sk,
"vikingdb",
self.region,
session_token=self.session_token,
)
SignerV4.sign(r, credentials)
return r

Expand All @@ -64,7 +71,7 @@ def do_req(self, req_method, req_params=None, req_body=None):
method=req.method,
url=f"https://{self.host}{req.path}",
headers=req.headers,
params=req_params,
params=req.query,
data=req.body,
timeout=DEFAULT_TIMEOUT,
)
Expand All @@ -77,11 +84,12 @@ class ClientForDataApi:
"cn-guangzhou": "api-vikingdb.vikingdb.cn-guangzhou.volces.com",
}

def __init__(self, ak, sk, region, host=None):
def __init__(self, ak, sk, region, host=None, session_token=None):
self.ak = ak
self.sk = sk
self.region = region
self.host = host if host else ClientForDataApi._global_host[region]
self.session_token = session_token or ""

if not all([self.ak, self.sk, self.host, self.region]):
raise ValueError("AK, SK, Host, and Region are required for ClientForDataApi")
Expand Down Expand Up @@ -110,7 +118,13 @@ def prepare_request(self, method, path, params=None, data=None):
if data is not None:
r.set_body(json.dumps(data))

credentials = Credentials(self.ak, self.sk, "vikingdb", self.region)
credentials = Credentials(
self.ak,
self.sk,
"vikingdb",
self.region,
session_token=self.session_token,
)
SignerV4.sign(r, credentials)
return r

Expand All @@ -122,7 +136,7 @@ def do_req(self, req_method, req_path, req_params=None, req_body=None):
method=req.method,
url=f"https://{self.host}{req.path}",
headers=req.headers,
params=req_params,
params=req.query,
data=req.body,
timeout=DEFAULT_TIMEOUT,
)
70 changes: 59 additions & 11 deletions openviking/storage/vectordb/collection/volcengine_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import json
from typing import Any, Dict, List, Optional

from openviking.storage.vectordb.collection.collection import ICollection
from openviking.storage.errors import ConnectionError
from openviking.storage.vectordb.collection.collection import Collection, ICollection
from openviking.storage.vectordb.collection.result import (
AggregateResult,
DataItem,
Expand Down Expand Up @@ -35,13 +36,16 @@ def get_or_create_volcengine_collection(config: Dict[str, Any], meta_data: Dict[
ak = config.get("AK")
sk = config.get("SK")
region = config.get("Region")
session_token = config.get("SessionToken")
if not ak or not sk or not region:
raise ValueError("AK, SK, and Region are required in config")

collection_name = meta_data.get("CollectionName")
if not collection_name:
raise ValueError("CollectionName is required in config")

# Initialize Console client for creating Collection
client = ClientForConsoleApi(ak, sk, region)
client = ClientForConsoleApi(ak, sk, region, session_token=session_token)

# Try to create Collection
try:
Expand All @@ -63,10 +67,15 @@ def get_or_create_volcengine_collection(config: Dict[str, Any], meta_data: Dict[
raise e

logger.info(f"Collection {collection_name} created successfully")
return VolcengineCollection(ak, sk, region, meta_data=meta_data)

# Return VolcengineCollection instance
return VolcengineCollection(ak=ak, sk=sk, region=region, meta_data=meta_data)
return Collection(
VolcengineCollection(
ak,
sk,
region,
session_token=session_token,
meta_data=meta_data,
)
)


class VolcengineCollection(ICollection):
Expand All @@ -76,19 +85,59 @@ def __init__(
sk: str,
region: str,
host: Optional[str] = None,
session_token: Optional[str] = None,
meta_data: Optional[Dict[str, Any]] = None,
):
self.console_client = ClientForConsoleApi(ak, sk, region, host)
self.data_client = ClientForDataApi(ak, sk, region, host)
self.console_client = ClientForConsoleApi(
ak,
sk,
region,
host,
session_token=session_token,
)
self.data_client = ClientForDataApi(
ak,
sk,
region,
host,
session_token=session_token,
)
self.meta_data = meta_data if meta_data is not None else {}
self.project_name = self.meta_data.get("ProjectName", "default")
self.collection_name = self.meta_data.get("CollectionName", "")

@staticmethod
def _build_response_error(response: Any, action: str) -> ConnectionError:
try:
result = response.json()
except json.JSONDecodeError:
result = {}

metadata = result.get("ResponseMetadata", {}) if isinstance(result, dict) else {}
error = metadata.get("Error", {}) if isinstance(metadata, dict) else {}
code = error.get("Code", "UnknownError")
message = error.get("Message", response.text)
return ConnectionError(
f"Request to {action} failed: {response.status_code} {code} {message}"
)

@staticmethod
def _is_collection_not_found(response: Any, action: str) -> bool:
if action != "GetVikingdbCollection" or response.status_code != 404:
return False
try:
result = response.json()
except json.JSONDecodeError:
return False
metadata = result.get("ResponseMetadata", {}) if isinstance(result, dict) else {}
error = metadata.get("Error", {}) if isinstance(metadata, dict) else {}
return error.get("Code") == "NotFound.VikingdbCollection"

def _console_post(self, data: Dict[str, Any], action: str):
params = {"Action": action, "Version": VIKING_DB_VERSION}
response = self.console_client.do_req("POST", req_params=params, req_body=data)
if response.status_code != 200:
logger.error(f"Request to {action} failed: {response.text}")
logger.error(str(self._build_response_error(response, action)))
return {}
try:
result = response.json()
Expand All @@ -103,11 +152,10 @@ def _console_get(self, params: Optional[Dict[str, Any]], action: str):
params = {}
req_params = {"Action": action, "Version": VIKING_DB_VERSION}
req_body = params

response = self.console_client.do_req("POST", req_params=req_params, req_body=req_body)

if response.status_code != 200:
logger.error(f"Request to {action} failed: {response.text}")
logger.error(str(self._build_response_error(response, action)))
return {}
try:
result = response.json()
Expand Down
20 changes: 14 additions & 6 deletions openviking/storage/vectordb_adapters/volcengine_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ def __init__(
ak: str,
sk: str,
region: str,
session_token: str | None,
project_name: str,
collection_name: str,
index_name: str,
):
super().__init__(collection_name=collection_name, index_name=index_name)
self._collection: Collection | None = None
self.mode = "volcengine"
self._ak = ak
self._sk = sk
self._region = region
self._session_token = session_token
self._project_name = project_name

@classmethod
Expand All @@ -48,6 +51,7 @@ def from_config(cls, config: Any):
ak=config.volcengine.ak,
sk=config.volcengine.sk,
region=config.volcengine.region,
session_token=config.volcengine.session_token,
project_name=config.project_name or "default",
collection_name=config.name or "context",
index_name=config.index_name or "default",
Expand All @@ -64,14 +68,18 @@ def _config(self) -> Dict[str, Any]:
"AK": self._ak,
"SK": self._sk,
"Region": self._region,
"SessionToken": self._session_token,
}

def _new_collection_handle(self) -> VolcengineCollection:
return VolcengineCollection(
ak=self._ak,
sk=self._sk,
region=self._region,
meta_data=self._meta(),
def _new_collection_handle(self) -> Collection:
return Collection(
VolcengineCollection(
ak=self._ak,
sk=self._sk,
region=self._region,
session_token=self._session_token,
meta_data=self._meta(),
)
)

def _load_existing_collection_if_needed(self) -> None:
Expand Down
4 changes: 4 additions & 0 deletions openviking_cli/utils/config/vectordb_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class VolcengineConfig(BaseModel):

ak: Optional[str] = Field(default=None, description="Volcengine Access Key")
sk: Optional[str] = Field(default=None, description="Volcengine Secret Key")
session_token: Optional[str] = Field(
default=None,
description="Optional Volcengine STS security token for temporary credentials",
)
region: Optional[str] = Field(
default=None, description="Volcengine region (e.g., 'cn-beijing')"
)
Expand Down
Loading
Loading