diff --git a/README.md b/README.md index a17220749..3d38d9444 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,10 @@ Options: --num-concurrency TEXT Comma-separated list of concurrency values to test during concurrent search [default: 1,10,20] + --concurrency-timeout INTEGER Timeout (in seconds) to wait for a + concurrency slot before failing. Set to a + negative value to wait indefinitely. + [default: 3600] --user-name TEXT Db username [required] --password TEXT Db password [required] --host TEXT Db host [required] diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index c07fc855d..52c2094b4 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -6,7 +6,7 @@ from . import log_util env = environs.Env() -env.read_env(".env", False) +env.read_env(path=".env", recurse=False) class config: @@ -52,6 +52,8 @@ class config: CONCURRENCY_DURATION = 30 + CONCURRENCY_TIMEOUT = 3600 + RESULTS_LOCAL_DIR = env.path( "RESULTS_LOCAL_DIR", pathlib.Path(__file__).parent.joinpath("results"), diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 687a0ecd7..fd87d7ece 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -5,10 +5,12 @@ import time import traceback from collections.abc import Iterable +from multiprocessing.queues import Queue import numpy as np from ... import config +from ...models import ConcurrencySlotTimeoutError from ..clients import api NUM_PER_BATCH = config.NUM_PER_BATCH @@ -28,16 +30,18 @@ def __init__( self, db: api.VectorDB, test_data: list[list[float]], - k: int = 100, + k: int = config.K_DEFAULT, filters: dict | None = None, concurrencies: Iterable[int] = config.NUM_CONCURRENCY, - duration: int = 30, + duration: int = config.CONCURRENCY_DURATION, + concurrency_timeout: int = config.CONCURRENCY_TIMEOUT, ): self.db = db self.k = k self.filters = filters self.concurrencies = concurrencies self.duration = duration + self.concurrency_timeout = concurrency_timeout self.test_data = test_data log.debug(f"test dataset columns: {len(test_data)}") @@ -114,9 +118,7 @@ def _run_all_concurrencies_mem_efficient(self): log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}") future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)] # Sync all processes - while q.qsize() < conc: - sleep_t = conc if conc < 10 else 10 - time.sleep(sleep_t) + self._wait_for_queue_fill(q, size=conc) with cond: cond.notify_all() @@ -160,6 +162,15 @@ def _run_all_concurrencies_mem_efficient(self): conc_latency_avg_list, ) + def _wait_for_queue_fill(self, q: Queue, size: int): + wait_t = 0 + while q.qsize() < size: + sleep_t = size if size < 10 else 10 + wait_t += sleep_t + if wait_t > self.concurrency_timeout > 0: + raise ConcurrencySlotTimeoutError + time.sleep(sleep_t) + def run(self) -> float: """ Returns: diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 2a583b4f5..1da3bb4cd 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -275,6 +275,7 @@ def _init_search_runner(self): filters=self.ca.filters, concurrencies=self.config.case_config.concurrency_search_config.num_concurrency, duration=self.config.case_config.concurrency_search_config.concurrency_duration, + concurrency_timeout=self.config.case_config.concurrency_search_config.concurrency_timeout, k=self.config.case_config.k, ) diff --git a/vectordb_bench/cli/cli.py b/vectordb_bench/cli/cli.py index 3ec3c18cd..1b0eb295b 100644 --- a/vectordb_bench/cli/cli.py +++ b/vectordb_bench/cli/cli.py @@ -17,10 +17,9 @@ import click from yaml import load -from vectordb_bench.backend.clients.api import MetricType - from .. import config from ..backend.clients import DB +from ..backend.clients.api import MetricType from ..interface import benchmark_runner, global_result_future from ..models import ( CaseConfig, @@ -303,6 +302,17 @@ class CommonTypedDict(TypedDict): callback=lambda *args: list(map(int, click_arg_split(*args))), ), ] + concurrency_timeout: Annotated[ + int, + click.option( + "--concurrency-timeout", + type=int, + default=config.CONCURRENCY_TIMEOUT, + show_default=True, + help="Timeout (in seconds) to wait for a concurrency slot before failing. " + "Set to a negative value to wait indefinitely.", + ), + ] custom_case_name: Annotated[ str, click.option( @@ -490,6 +500,7 @@ def run( concurrency_search_config=ConcurrencySearchConfig( concurrency_duration=parameters["concurrency_duration"], num_concurrency=[int(s) for s in parameters["num_concurrency"]], + concurrency_timeout=parameters["concurrency_timeout"], ), custom_case=get_custom_case_config(parameters), ), diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index 997831a00..c35c21755 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -30,6 +30,11 @@ def __init__(self): super().__init__("Performance case optimize timeout") +class ConcurrencySlotTimeoutError(TimeoutError): + def __init__(self): + super().__init__("Timeout while waiting for a concurrency slot to become available") + + class CaseConfigParamType(Enum): """ Value will be the key of CaseConfig.params and displayed in UI @@ -113,6 +118,7 @@ class CustomizedCase(BaseModel): class ConcurrencySearchConfig(BaseModel): num_concurrency: list[int] = config.NUM_CONCURRENCY concurrency_duration: int = config.CONCURRENCY_DURATION + concurrency_timeout: int = config.CONCURRENCY_TIMEOUT class CaseConfig(BaseModel):