Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,47 @@ Options:
with-gt]
--help Show this message and exit.
```

### Run awsopensearch from command line

```shell
vectordbbench awsopensearch --db-label awsopensearch \
--m 16 --ef-construction 256 \
--host search-vector-db-prod-h4f6m4of6x7yp2rz7gdmots7w4.us-west-2.es.amazonaws.com --port 443 \
--user vector --password '<password>' \
--case-type Performance1536D5M --num-insert-workers 10 \
--skip-load --num-concurrency 75
```

To list the options for awsopensearch, execute `vectordbbench awsopensearch --help`

```text
$ vectordbbench awsopensearch --help
Usage: vectordbbench awsopensearch [OPTIONS]

Options:
# Sharding and Replication
--number-of-shards INTEGER Number of primary shards for the index
--number-of-replicas INTEGER Number of replica copies for each primary
shard
# Indexing Performance
--index-thread-qty INTEGER Thread count for native engine indexing
--index-thread-qty-during-force-merge INTEGER
Thread count during force merge operations
--number-of-indexing-clients INTEGER
Number of concurrent indexing clients
# Index Management
--number-of-segments INTEGER Target number of segments after merging
--refresh-interval TEXT How often to make new data available for
search
--force-merge-enabled BOOLEAN Whether to perform force merge operation
--flush-threshold-size TEXT Size threshold for flushing the transaction
log
# Memory Management
--cb-threshold TEXT k-NN Memory circuit breaker threshold

--help Show this message and exit.```

#### Using a configuration file.

The vectordbbench command can optionally read some or all the options from a yaml formatted configuration file.
Expand Down
57 changes: 53 additions & 4 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

WAITING_FOR_REFRESH_SEC = 30
WAITING_FOR_FORCE_MERGE_SEC = 30
SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC = 30


class AWSOpenSearch(VectorDB):
Expand Down Expand Up @@ -52,10 +53,27 @@ def case_config_cls(cls, index_type: IndexType | None = None) -> AWSOpenSearchIn
return AWSOpenSearchIndexConfig

def _create_index(self, client: OpenSearch):
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
"knn.memory.circuit_breaker.limit": self.case_config.cb_threshold,
}
}
client.cluster.put_settings(cluster_settings_body)
settings = {
"index": {
"knn": True,
"number_of_shards": self.case_config.number_of_shards,
"number_of_replicas": 0,
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
# Setting trans log threshold to 5GB
**(
{"knn.algo_param.ef_search": self.case_config.ef_search}
if self.case_config.engine == AWSOS_Engine.nmslib
else {}
),
},
"refresh_interval": self.case_config.refresh_interval,
}
mappings = {
"properties": {
Expand Down Expand Up @@ -145,9 +163,9 @@ def search_embedding(
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
)
log.info(f"Search took: {resp['took']}")
log.info(f"Search shards: {resp['_shards']}")
log.info(f"Search hits total: {resp['hits']['total']}")
log.debug(f"Search took: {resp['took']}")
log.debug(f"Search shards: {resp['_shards']}")
log.debug(f"Search hits total: {resp['hits']['total']}")
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception as e:
log.warning(f"Failed to search: {self.index_name} error: {e!s}")
Expand All @@ -157,12 +175,37 @@ def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
# Call refresh first to ensure that all segments are created
self._refresh_index()
self._do_force_merge()
if self.case_config.force_merge_enabled:
self._do_force_merge()
self._refresh_index()
self._update_replicas()
# Call refresh again to ensure that the index is ready after force merge.
self._refresh_index()
# ensure that all graphs are loaded in memory and ready for search
self._load_graphs_to_memory()

def _update_replicas(self):
index_settings = self.client.indices.get_settings(index=self.index_name)
current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"])
log.info(
f"Current Number of replicas are {current_number_of_replicas}"
f" and changing the replicas to {self.case_config.number_of_replicas}"
)
settings_body = {"index": {"number_of_replicas": self.case_config.number_of_replicas}}
self.client.indices.put_settings(index=self.index_name, body=settings_body)
self._wait_till_green()

def _wait_till_green(self):
log.info("Wait for index to become green..")
while True:
res = self.client.cat.indices(index=self.index_name, h="health", format="json")
health = res[0]["health"]
if health != "green":
break
log.info(f"The index {self.index_name} has health : {health} and is not green. Retrying")
time.sleep(SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC)
log.info(f"Index {self.index_name} is green..")

def _refresh_index(self):
log.debug(f"Starting refresh for index {self.index_name}")
while True:
Expand All @@ -179,6 +222,12 @@ def _refresh_index(self):
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.")

cluster_settings_body = {
"persistent": {"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty_during_force_merge}
}
self.client.cluster.put_settings(cluster_settings_body)
log.debug(f"Starting force merge for index {self.index_name}")
force_merge_endpoint = f"/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false"
force_merge_task_id = self.client.transport.perform_request("POST", force_merge_endpoint)["task"]
Expand Down
86 changes: 85 additions & 1 deletion vectordb_bench/backend/clients/aws_opensearch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,79 @@ class AWSOpenSearchTypedDict(TypedDict):
port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")]
user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")]
password: Annotated[str, click.option("--password", type=str, help="Db password")]
number_of_shards: Annotated[
int,
click.option("--number-of-shards", type=int, help="Number of primary shards for the index", default=1),
]
number_of_replicas: Annotated[
int,
click.option(
"--number-of-replicas", type=int, help="Number of replica copies for each primary shard", default=1
),
]
index_thread_qty: Annotated[
int,
click.option(
"--index-thread-qty",
type=int,
help="Thread count for native engine indexing",
default=4,
),
]

index_thread_qty_during_force_merge: Annotated[
int,
click.option(
"--index-thread-qty-during-force-merge",
type=int,
help="Thread count during force merge operations",
default=4,
),
]

number_of_indexing_clients: Annotated[
int,
click.option(
"--number-of-indexing-clients",
type=int,
help="Number of concurrent indexing clients",
default=1,
),
]

number_of_segments: Annotated[
int,
click.option("--number-of-segments", type=int, help="Target number of segments after merging", default=1),
]

refresh_interval: Annotated[
int,
click.option(
"--refresh-interval", type=str, help="How often to make new data available for search", default="60s"
),
]

force_merge_enabled: Annotated[
int,
click.option("--force-merge-enabled", type=bool, help="Whether to perform force merge operation", default=True),
]

flush_threshold_size: Annotated[
int,
click.option(
"--flush-threshold-size", type=str, help="Size threshold for flushing the transaction log", default="5120mb"
),
]

cb_threshold: Annotated[
int,
click.option(
"--cb-threshold",
type=str,
help="k-NN Memory circuit breaker threshold",
default="50%",
),
]


class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2): ...
Expand All @@ -36,6 +109,17 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
user=parameters["user"],
password=SecretStr(parameters["password"]),
),
db_case_config=AWSOpenSearchIndexConfig(),
db_case_config=AWSOpenSearchIndexConfig(
number_of_shards=parameters["number_of_shards"],
number_of_replicas=parameters["number_of_replicas"],
index_thread_qty=parameters["index_thread_qty"],
number_of_segments=parameters["number_of_segments"],
refresh_interval=parameters["refresh_interval"],
force_merge_enabled=parameters["force_merge_enabled"],
flush_threshold_size=parameters["flush_threshold_size"],
number_of_indexing_clients=parameters["number_of_indexing_clients"],
index_thread_qty_during_force_merge=parameters["index_thread_qty_during_force_merge"],
cb_threshold=parameters["cb_threshold"],
),
**parameters,
)
10 changes: 10 additions & 0 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
efConstruction: int = 256
efSearch: int = 256
M: int = 16
index_thread_qty: int | None = 4
number_of_shards: int | None = 1
number_of_replicas: int | None = 0
number_of_segments: int | None = 1
refresh_interval: str | None = "60s"
force_merge_enabled: bool | None = True
flush_threshold_size: str | None = "5120mb"
number_of_indexing_clients: int | None = 1
index_thread_qty_during_force_merge: int
cb_threshold: str | None = "50%"

def parse_metric(self) -> str:
if self.metric_type == MetricType.IP:
Expand Down
Loading