From 8273b2aebc6567ce9a43b19bf8553c6ae0b5dbb7 Mon Sep 17 00:00:00 2001 From: xavrathi Date: Mon, 27 Jan 2025 13:16:32 -0800 Subject: [PATCH 1/2] Added the configuration parameters to create Opensearch dynamically with right replicas, shards and other opensearch related configurations. Added the feature to create OS index with 0 replica and once the data is loaded update the replicas according to the parameter. --- README.md | 36 +++++++++ .../clients/aws_opensearch/aws_opensearch.py | 57 ++++++++++++- .../backend/clients/aws_opensearch/cli.py | 80 ++++++++++++++++++- .../backend/clients/aws_opensearch/config.py | 10 +++ 4 files changed, 178 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 56ae88753..b6400248e 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,42 @@ Options: with-gt] --help Show this message and exit. ``` + +### Run awsopensearch from command line + +```shell +vectordbbench awsopensearch --db-label awsopensearch \ +--m 16 --ef-construction 256 \ +--host search-vector-db-prod-h4f6m4of6x7yp2rz7gdmots7w4.us-west-2.es.amazonaws.com --port 443 \ +--user vector --password '' \ +--case-type Performance1536D5M --num-insert-workers 10 \ +--skip-load --num-concurrency 75 +``` + +To list the options for awsopensearch, execute `vectordbbench awsopensearch --help` + +```text +$ vectordbbench awsopensearch --help +Usage: vectordbbench awsopensearch [OPTIONS] + +Options: + --number-of-shards INTEGER Number of shards + --number-of-replicas INTEGER Number of replica + --index-thread-qty INTEGER Thread count for native engine indexing + --index-thread-qty-during-force-merge INTEGER + Thread count for native engine indexing used + during force merge + --number-of-segments INTEGER Number of segments + --refresh-interval TEXT refresh-interval for the index + --force-merge-enabled BOOLEAN If we need to do force merge or not + --flush-threshold-size TEXT Threshold for flushing translog + --number-of-indexing-clients INTEGER + Number of indexing clients that should be + used for indexing the data + --cb-threshold TEXT k-NN Memory circuit breaker threshold + --help Show this message and exit. +``` + #### Using a configuration file. The vectordbbench command can optionally read some or all the options from a yaml formatted configuration file. diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 234014f19..adb766300 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -12,6 +12,7 @@ WAITING_FOR_REFRESH_SEC = 30 WAITING_FOR_FORCE_MERGE_SEC = 30 +SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC = 30 class AWSOpenSearch(VectorDB): @@ -52,10 +53,27 @@ def case_config_cls(cls, index_type: IndexType | None = None) -> AWSOpenSearchIn return AWSOpenSearchIndexConfig def _create_index(self, client: OpenSearch): + cluster_settings_body = { + "persistent": { + "knn.algo_param.index_thread_qty": self.case_config.index_thread_qty, + "knn.memory.circuit_breaker.limit": self.case_config.cb_threshold, + } + } + client.cluster.put_settings(cluster_settings_body) settings = { "index": { "knn": True, + "number_of_shards": self.case_config.number_of_shards, + "number_of_replicas": 0, + "translog.flush_threshold_size": self.case_config.flush_threshold_size, + # Setting trans log threshold to 5GB + **( + {"knn.algo_param.ef_search": self.case_config.ef_search} + if self.case_config.engine == AWSOS_Engine.nmslib + else {} + ), }, + "refresh_interval": self.case_config.refresh_interval, } mappings = { "properties": { @@ -145,9 +163,9 @@ def search_embedding( docvalue_fields=[self.id_col_name], stored_fields="_none_", ) - log.info(f"Search took: {resp['took']}") - log.info(f"Search shards: {resp['_shards']}") - log.info(f"Search hits total: {resp['hits']['total']}") + log.debug(f"Search took: {resp['took']}") + log.debug(f"Search shards: {resp['_shards']}") + log.debug(f"Search hits total: {resp['hits']['total']}") return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]] except Exception as e: log.warning(f"Failed to search: {self.index_name} error: {e!s}") @@ -157,12 +175,37 @@ def optimize(self, data_size: int | None = None): """optimize will be called between insertion and search in performance cases.""" # Call refresh first to ensure that all segments are created self._refresh_index() - self._do_force_merge() + if self.case_config.force_merge_enabled: + self._do_force_merge() + self._refresh_index() + self._update_replicas() # Call refresh again to ensure that the index is ready after force merge. self._refresh_index() # ensure that all graphs are loaded in memory and ready for search self._load_graphs_to_memory() + def _update_replicas(self): + index_settings = self.client.indices.get_settings(index=self.index_name) + current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"]) + log.info( + f"Current Number of replicas are {current_number_of_replicas}" + f" and changing the replicas to {self.case_config.number_of_replicas}" + ) + settings_body = {"index": {"number_of_replicas": self.case_config.number_of_replicas}} + self.client.indices.put_settings(index=self.index_name, body=settings_body) + self._wait_till_green() + + def _wait_till_green(self): + log.info("Wait for index to become green..") + while True: + res = self.client.cat.indices(index=self.index_name, h="health", format="json") + health = res[0]["health"] + if health != "green": + break + log.info(f"The index {self.index_name} has health : {health} and is not green. Retrying") + time.sleep(SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC) + log.info(f"Index {self.index_name} is green..") + def _refresh_index(self): log.debug(f"Starting refresh for index {self.index_name}") while True: @@ -179,6 +222,12 @@ def _refresh_index(self): log.debug(f"Completed refresh for index {self.index_name}") def _do_force_merge(self): + log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.") + + cluster_settings_body = { + "persistent": {"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty_during_force_merge} + } + self.client.cluster.put_settings(cluster_settings_body) log.debug(f"Starting force merge for index {self.index_name}") force_merge_endpoint = f"/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false" force_merge_task_id = self.client.transport.perform_request("POST", force_merge_endpoint)["task"] diff --git a/vectordb_bench/backend/clients/aws_opensearch/cli.py b/vectordb_bench/backend/clients/aws_opensearch/cli.py index bb0c2450d..826e465c5 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/cli.py +++ b/vectordb_bench/backend/clients/aws_opensearch/cli.py @@ -18,6 +18,73 @@ class AWSOpenSearchTypedDict(TypedDict): port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")] user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")] password: Annotated[str, click.option("--password", type=str, help="Db password")] + number_of_shards: Annotated[ + int, + click.option("--number-of-shards", type=int, help="Number of shards", default=1), + ] + number_of_replicas: Annotated[ + int, + click.option("--number-of-replicas", type=int, help="Number of replica", default=1), + ] + index_thread_qty: Annotated[ + int, + click.option( + "--index-thread-qty", + type=int, + help="Thread count for native engine indexing", + default=4, + ), + ] + + index_thread_qty_during_force_merge: Annotated[ + int, + click.option( + "--index-thread-qty-during-force-merge", + type=int, + help="Thread count for native engine indexing used during force merge", + default=4, + ), + ] + + number_of_segments: Annotated[ + int, + click.option("--number-of-segments", type=int, help="Number of segments", default=1), + ] + + refresh_interval: Annotated[ + int, + click.option("--refresh-interval", type=str, help="refresh-interval", default="60s"), + ] + + force_merge_enabled: Annotated[ + int, + click.option("--force-merge-enabled", type=bool, help="If we need to do force merge or not", default=True), + ] + + flush_threshold_size: Annotated[ + int, + click.option("--flush-threshold-size", type=str, help="Threshold for flushing translog", default="5120mb"), + ] + + number_of_indexing_clients: Annotated[ + int, + click.option( + "--number-of-indexing-clients", + type=int, + help="Number of indexing clients that should be used for indexing the data", + default=1, + ), + ] + + cb_threshold: Annotated[ + int, + click.option( + "--cb-threshold", + type=str, + help="k-NN Memory circuit breaker threshold", + default="50%", + ), + ] class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2): ... @@ -36,6 +103,17 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]): user=parameters["user"], password=SecretStr(parameters["password"]), ), - db_case_config=AWSOpenSearchIndexConfig(), + db_case_config=AWSOpenSearchIndexConfig( + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + index_thread_qty=parameters["index_thread_qty"], + number_of_segments=parameters["number_of_segments"], + refresh_interval=parameters["refresh_interval"], + force_merge_enabled=parameters["force_merge_enabled"], + flush_threshold_size=parameters["flush_threshold_size"], + number_of_indexing_clients=parameters["number_of_indexing_clients"], + index_thread_qty_during_force_merge=parameters["index_thread_qty_during_force_merge"], + cb_threshold=parameters["cb_threshold"], + ), **parameters, ) diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index e9ccc7277..dd51b266d 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -39,6 +39,16 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig): efConstruction: int = 256 efSearch: int = 256 M: int = 16 + index_thread_qty: int | None = 4 + number_of_shards: int | None = 1 + number_of_replicas: int | None = 0 + number_of_segments: int | None = 1 + refresh_interval: str | None = "60s" + force_merge_enabled: bool | None = True + flush_threshold_size: str | None = "5120mb" + number_of_indexing_clients: int | None = 1 + index_thread_qty_during_force_merge: int + cb_threshold: str | None = "50%" def parse_metric(self) -> str: if self.metric_type == MetricType.IP: From 5a0b640f4c1a7fe1d71d8756bad8e261e47eb720 Mon Sep 17 00:00:00 2001 From: xavrathi Date: Mon, 27 Jan 2025 13:16:32 -0800 Subject: [PATCH 2/2] Updated the readme for config parameters --- README.md | 29 ++++++++------- .../backend/clients/aws_opensearch/cli.py | 36 +++++++++++-------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index b6400248e..e8be99cec 100644 --- a/README.md +++ b/README.md @@ -172,22 +172,27 @@ $ vectordbbench awsopensearch --help Usage: vectordbbench awsopensearch [OPTIONS] Options: - --number-of-shards INTEGER Number of shards - --number-of-replicas INTEGER Number of replica + # Sharding and Replication + --number-of-shards INTEGER Number of primary shards for the index + --number-of-replicas INTEGER Number of replica copies for each primary + shard + # Indexing Performance --index-thread-qty INTEGER Thread count for native engine indexing --index-thread-qty-during-force-merge INTEGER - Thread count for native engine indexing used - during force merge - --number-of-segments INTEGER Number of segments - --refresh-interval TEXT refresh-interval for the index - --force-merge-enabled BOOLEAN If we need to do force merge or not - --flush-threshold-size TEXT Threshold for flushing translog + Thread count during force merge operations --number-of-indexing-clients INTEGER - Number of indexing clients that should be - used for indexing the data + Number of concurrent indexing clients + # Index Management + --number-of-segments INTEGER Target number of segments after merging + --refresh-interval TEXT How often to make new data available for + search + --force-merge-enabled BOOLEAN Whether to perform force merge operation + --flush-threshold-size TEXT Size threshold for flushing the transaction + log + # Memory Management --cb-threshold TEXT k-NN Memory circuit breaker threshold - --help Show this message and exit. -``` + + --help Show this message and exit.``` #### Using a configuration file. diff --git a/vectordb_bench/backend/clients/aws_opensearch/cli.py b/vectordb_bench/backend/clients/aws_opensearch/cli.py index 826e465c5..fa457154d 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/cli.py +++ b/vectordb_bench/backend/clients/aws_opensearch/cli.py @@ -20,11 +20,13 @@ class AWSOpenSearchTypedDict(TypedDict): password: Annotated[str, click.option("--password", type=str, help="Db password")] number_of_shards: Annotated[ int, - click.option("--number-of-shards", type=int, help="Number of shards", default=1), + click.option("--number-of-shards", type=int, help="Number of primary shards for the index", default=1), ] number_of_replicas: Annotated[ int, - click.option("--number-of-replicas", type=int, help="Number of replica", default=1), + click.option( + "--number-of-replicas", type=int, help="Number of replica copies for each primary shard", default=1 + ), ] index_thread_qty: Annotated[ int, @@ -41,38 +43,42 @@ class AWSOpenSearchTypedDict(TypedDict): click.option( "--index-thread-qty-during-force-merge", type=int, - help="Thread count for native engine indexing used during force merge", + help="Thread count during force merge operations", default=4, ), ] + number_of_indexing_clients: Annotated[ + int, + click.option( + "--number-of-indexing-clients", + type=int, + help="Number of concurrent indexing clients", + default=1, + ), + ] + number_of_segments: Annotated[ int, - click.option("--number-of-segments", type=int, help="Number of segments", default=1), + click.option("--number-of-segments", type=int, help="Target number of segments after merging", default=1), ] refresh_interval: Annotated[ int, - click.option("--refresh-interval", type=str, help="refresh-interval", default="60s"), + click.option( + "--refresh-interval", type=str, help="How often to make new data available for search", default="60s" + ), ] force_merge_enabled: Annotated[ int, - click.option("--force-merge-enabled", type=bool, help="If we need to do force merge or not", default=True), + click.option("--force-merge-enabled", type=bool, help="Whether to perform force merge operation", default=True), ] flush_threshold_size: Annotated[ - int, - click.option("--flush-threshold-size", type=str, help="Threshold for flushing translog", default="5120mb"), - ] - - number_of_indexing_clients: Annotated[ int, click.option( - "--number-of-indexing-clients", - type=int, - help="Number of indexing clients that should be used for indexing the data", - default=1, + "--flush-threshold-size", type=str, help="Size threshold for flushing the transaction log", default="5120mb" ), ]