diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 03443b255..8d77d263b 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -117,6 +117,7 @@ def _build_vector_field_config(self) -> dict: "data_type": "float", "mode": "on_disk", "compression_level": "32x", + "method": method_config, } log.info("Using on-disk vector configuration with compression_level: 32x") else: @@ -283,13 +284,19 @@ def insert_chunk(client_idx: int, chunk_idx: int): other_data[self.label_col_name] = chunk_labels_data[i] insert_data.append(other_data) - try: - resp = client.bulk(body=insert_data) - log.info(f"Client {client_idx} added {len(resp['items'])} documents") - return len(chunk_embeddings), None - except Exception as e: - log.warning(f"Client {client_idx} failed to insert data: {e!s}") - return 0, e + max_retries = 10 + for attempt in range(max_retries): + try: + client.bulk(body=insert_data) + return len(chunk_embeddings), None + except Exception as e: + if "429" in str(e) and attempt < max_retries - 1: + log.warning(f"Client {client_idx} got 429 error, retry {attempt + 1}/{max_retries} after 10s") + time.sleep(10) + else: + log.warning(f"Client {client_idx} failed to insert data: {e!s}") + return 0, e + return 0, Exception("Max retries exceeded") results = [] with ThreadPoolExecutor(max_workers=len(clients)) as executor: diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index 327ba83e5..ff87f66bf 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -129,10 +129,6 @@ def index_param(self) -> dict: if self.engine == AWSOS_Engine.s3vector: return {"engine": "s3vector"} - # For on-disk mode, return empty dict as no method config is needed - if self.on_disk: - return {} - parameters = {"ef_construction": self.efConstruction, "m": self.M} # Add encoder configuration based on quantization type