Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ Options:
--num-concurrency TEXT Comma-separated list of concurrency values
to test during concurrent search [default:
1,10,20]
--concurrency-timeout INTEGER Timeout (in seconds) to wait for a
concurrency slot before failing. Set to a
negative value to wait indefinitely.
[default: 3600]
--user-name TEXT Db username [required]
--password TEXT Db password [required]
--host TEXT Db host [required]
Expand Down
4 changes: 3 additions & 1 deletion vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import log_util

env = environs.Env()
env.read_env(".env", False)
env.read_env(path=".env", recurse=False)


class config:
Expand Down Expand Up @@ -52,6 +52,8 @@ class config:

CONCURRENCY_DURATION = 30

CONCURRENCY_TIMEOUT = 3600

RESULTS_LOCAL_DIR = env.path(
"RESULTS_LOCAL_DIR",
pathlib.Path(__file__).parent.joinpath("results"),
Expand Down
21 changes: 16 additions & 5 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import time
import traceback
from collections.abc import Iterable
from multiprocessing.queues import Queue

import numpy as np

from ... import config
from ...models import ConcurrencySlotTimeoutError
from ..clients import api

NUM_PER_BATCH = config.NUM_PER_BATCH
Expand All @@ -28,16 +30,18 @@ def __init__(
self,
db: api.VectorDB,
test_data: list[list[float]],
k: int = 100,
k: int = config.K_DEFAULT,
filters: dict | None = None,
concurrencies: Iterable[int] = config.NUM_CONCURRENCY,
duration: int = 30,
duration: int = config.CONCURRENCY_DURATION,
concurrency_timeout: int = config.CONCURRENCY_TIMEOUT,
):
self.db = db
self.k = k
self.filters = filters
self.concurrencies = concurrencies
self.duration = duration
self.concurrency_timeout = concurrency_timeout

self.test_data = test_data
log.debug(f"test dataset columns: {len(test_data)}")
Expand Down Expand Up @@ -114,9 +118,7 @@ def _run_all_concurrencies_mem_efficient(self):
log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)]
# Sync all processes
while q.qsize() < conc:
sleep_t = conc if conc < 10 else 10
time.sleep(sleep_t)
self._wait_for_queue_fill(q, size=conc)

with cond:
cond.notify_all()
Expand Down Expand Up @@ -160,6 +162,15 @@ def _run_all_concurrencies_mem_efficient(self):
conc_latency_avg_list,
)

def _wait_for_queue_fill(self, q: Queue, size: int):
wait_t = 0
while q.qsize() < size:
sleep_t = size if size < 10 else 10
wait_t += sleep_t
if wait_t > self.concurrency_timeout > 0:
raise ConcurrencySlotTimeoutError
time.sleep(sleep_t)

def run(self) -> float:
"""
Returns:
Expand Down
1 change: 1 addition & 0 deletions vectordb_bench/backend/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def _init_search_runner(self):
filters=self.ca.filters,
concurrencies=self.config.case_config.concurrency_search_config.num_concurrency,
duration=self.config.case_config.concurrency_search_config.concurrency_duration,
concurrency_timeout=self.config.case_config.concurrency_search_config.concurrency_timeout,
k=self.config.case_config.k,
)

Expand Down
15 changes: 13 additions & 2 deletions vectordb_bench/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import click
from yaml import load

from vectordb_bench.backend.clients.api import MetricType

from .. import config
from ..backend.clients import DB
from ..backend.clients.api import MetricType
from ..interface import benchmark_runner, global_result_future
from ..models import (
CaseConfig,
Expand Down Expand Up @@ -303,6 +302,17 @@ class CommonTypedDict(TypedDict):
callback=lambda *args: list(map(int, click_arg_split(*args))),
),
]
concurrency_timeout: Annotated[
int,
click.option(
"--concurrency-timeout",
type=int,
default=config.CONCURRENCY_TIMEOUT,
show_default=True,
help="Timeout (in seconds) to wait for a concurrency slot before failing. "
"Set to a negative value to wait indefinitely.",
),
]
custom_case_name: Annotated[
str,
click.option(
Expand Down Expand Up @@ -490,6 +500,7 @@ def run(
concurrency_search_config=ConcurrencySearchConfig(
concurrency_duration=parameters["concurrency_duration"],
num_concurrency=[int(s) for s in parameters["num_concurrency"]],
concurrency_timeout=parameters["concurrency_timeout"],
),
custom_case=get_custom_case_config(parameters),
),
Expand Down
6 changes: 6 additions & 0 deletions vectordb_bench/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def __init__(self):
super().__init__("Performance case optimize timeout")


class ConcurrencySlotTimeoutError(TimeoutError):
def __init__(self):
super().__init__("Timeout while waiting for a concurrency slot to become available")


class CaseConfigParamType(Enum):
"""
Value will be the key of CaseConfig.params and displayed in UI
Expand Down Expand Up @@ -113,6 +118,7 @@ class CustomizedCase(BaseModel):
class ConcurrencySearchConfig(BaseModel):
num_concurrency: list[int] = config.NUM_CONCURRENCY
concurrency_duration: int = config.CONCURRENCY_DURATION
concurrency_timeout: int = config.CONCURRENCY_TIMEOUT


class CaseConfig(BaseModel):
Expand Down