From 4690a6b8c1c9f279f92a1d57025ab4bfbeebf71b Mon Sep 17 00:00:00 2001 From: norrishuang <12380647@qq.com> Date: Wed, 3 Dec 2025 05:34:29 +0000 Subject: [PATCH 1/2] 1. Optimized logs output of load data 2. Modified create index dsl of on disk mode. --- .../clients/aws_opensearch/aws_opensearch.py | 35 +++++++++++-------- .../backend/clients/aws_opensearch/config.py | 4 --- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 03443b255..995b5814b 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -111,13 +111,14 @@ def _build_vector_field_config(self) -> dict: if self.case_config.on_disk: space_type = self.case_config.parse_metric() vector_field_config = { - "type": "knn_vector", - "dimension": self.dim, - "space_type": space_type, - "data_type": "float", - "mode": "on_disk", - "compression_level": "32x", - } + "type": "knn_vector", + "dimension": self.dim, + "space_type": space_type, + "data_type": "float", + "mode": "on_disk", + "compression_level": "32x", + "method": method_config + } log.info("Using on-disk vector configuration with compression_level: 32x") else: vector_field_config = { @@ -283,13 +284,19 @@ def insert_chunk(client_idx: int, chunk_idx: int): other_data[self.label_col_name] = chunk_labels_data[i] insert_data.append(other_data) - try: - resp = client.bulk(body=insert_data) - log.info(f"Client {client_idx} added {len(resp['items'])} documents") - return len(chunk_embeddings), None - except Exception as e: - log.warning(f"Client {client_idx} failed to insert data: {e!s}") - return 0, e + max_retries = 10 + for attempt in range(max_retries): + try: + resp = client.bulk(body=insert_data) + return len(chunk_embeddings), None + except Exception as e: + if "429" in str(e) and attempt < max_retries - 1: + log.warning(f"Client {client_idx} got 429 error, retry {attempt + 1}/{max_retries} after 10s") + time.sleep(10) + else: + log.warning(f"Client {client_idx} failed to insert data: {e!s}") + return 0, e + return 0, Exception("Max retries exceeded") results = [] with ThreadPoolExecutor(max_workers=len(clients)) as executor: diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index 327ba83e5..ff87f66bf 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -129,10 +129,6 @@ def index_param(self) -> dict: if self.engine == AWSOS_Engine.s3vector: return {"engine": "s3vector"} - # For on-disk mode, return empty dict as no method config is needed - if self.on_disk: - return {} - parameters = {"ef_construction": self.efConstruction, "m": self.M} # Add encoder configuration based on quantization type From 0d4ad43971612707c4d9c4d7351ca4c3ec8e9868 Mon Sep 17 00:00:00 2001 From: norrishuang <12380647@qq.com> Date: Wed, 3 Dec 2025 08:11:55 +0000 Subject: [PATCH 2/2] refine format --- .../clients/aws_opensearch/aws_opensearch.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 995b5814b..8d77d263b 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -111,14 +111,14 @@ def _build_vector_field_config(self) -> dict: if self.case_config.on_disk: space_type = self.case_config.parse_metric() vector_field_config = { - "type": "knn_vector", - "dimension": self.dim, - "space_type": space_type, - "data_type": "float", - "mode": "on_disk", - "compression_level": "32x", - "method": method_config - } + "type": "knn_vector", + "dimension": self.dim, + "space_type": space_type, + "data_type": "float", + "mode": "on_disk", + "compression_level": "32x", + "method": method_config, + } log.info("Using on-disk vector configuration with compression_level: 32x") else: vector_field_config = { @@ -287,7 +287,7 @@ def insert_chunk(client_idx: int, chunk_idx: int): max_retries = 10 for attempt in range(max_retries): try: - resp = client.bulk(body=insert_data) + client.bulk(body=insert_data) return len(chunk_embeddings), None except Exception as e: if "429" in str(e) and attempt < max_retries - 1: