Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions vectordb_bench/backend/clients/oss_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ class OSSOpenSearchQuantization(Enum):
fp16 = "fp16"


# Compression level constants for disk-based mode
class CompressionLevel:
"""Valid compression levels for disk-based vector search"""

LEVEL_1X = "1x"
LEVEL_2X = "2x"
LEVEL_4X = "4x"
LEVEL_8X = "8x"
LEVEL_16X = "16x"
LEVEL_32X = "32x"

ALL = [LEVEL_1X, LEVEL_2X, LEVEL_4X, LEVEL_8X, LEVEL_16X, LEVEL_32X]

# Lucene: 1x, 4x | FAISS: 2x, 8x, 16x, 32x
ENGINE_MAP = {
LEVEL_1X: OSSOS_Engine.lucene,
LEVEL_2X: OSSOS_Engine.faiss,
LEVEL_4X: OSSOS_Engine.lucene,
LEVEL_8X: OSSOS_Engine.faiss,
LEVEL_16X: OSSOS_Engine.faiss,
LEVEL_32X: OSSOS_Engine.faiss,
}


class OSSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType = MetricType.L2
engine: OSSOS_Engine = OSSOS_Engine.faiss
Expand All @@ -74,11 +98,13 @@ class OSSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
cb_threshold: str | None = "50%"
number_of_indexing_clients: int | None = 1
use_routing: bool = False # for label-filter cases
oversample_factor: float = 1.0
quantization_type: OSSOpenSearchQuantization = OSSOpenSearchQuantization.fp32
replication_type: str | None = "DOCUMENT"
knn_derived_source_enabled: bool = False
memory_optimized_search: bool = False
on_disk: bool = False
compression_level: str = CompressionLevel.LEVEL_32X
oversample_factor: float = 1.0

@root_validator
def validate_engine_name(cls, values: dict):
Expand Down Expand Up @@ -107,6 +133,9 @@ def __eq__(self, obj: any):
and self.replication_type == obj.replication_type
and self.knn_derived_source_enabled == obj.knn_derived_source_enabled
and self.memory_optimized_search == obj.memory_optimized_search
and self.on_disk == obj.on_disk
and self.compression_level == obj.compression_level
and self.oversample_factor == obj.oversample_factor
)

def __hash__(self) -> int:
Expand All @@ -123,6 +152,9 @@ def __hash__(self) -> int:
self.replication_type,
self.knn_derived_source_enabled,
self.memory_optimized_search,
self.on_disk,
self.compression_level,
self.oversample_factor,
)
)

Expand All @@ -140,27 +172,48 @@ def parse_metric(self) -> str:

@property
def use_quant(self) -> bool:
return self.quantization_type is not OSSOpenSearchQuantization.fp32
"""Only use in-memory quantization when NOT in disk mode"""
return not self.on_disk and self.quantization_type is not OSSOpenSearchQuantization.fp32

@property
def resolved_engine(self) -> OSSOS_Engine:
"""Return engine based on mode: auto-selected for disk, configured for in-memory."""
if self.on_disk:
return CompressionLevel.ENGINE_MAP.get(self.compression_level, OSSOS_Engine.faiss)
return self.engine

def index_param(self) -> dict:
log.info(f"Using engine: {self.engine} for index creation")
log.info(f"Using metric_type: {self.metric_type_name} for index creation")
log.info(f"Resulting space_type: {self.parse_metric()} for index creation")
resolved_engine = self.resolved_engine
space_type = self.parse_metric()

log.info(
f"Index configuration - "
f"mode: {'disk' if self.on_disk else 'in-memory'}, "
f"configured_engine: {self.engine.value}, "
f"resolved_engine: {resolved_engine.value}, "
f"metric_type: {self.metric_type_name}, "
f"space_type: {space_type}"
f"{', ' if self.on_disk else ''}"
f"{'compression_level: ' + self.compression_level if self.on_disk else ''}"
)

return {
method_config = {
"name": "hnsw",
"engine": self.engine.value,
"space_type": self.parse_metric(),
"engine": resolved_engine.value,
"space_type": space_type,
"parameters": {
"ef_construction": self.efConstruction,
"m": self.M,
**(
{"encoder": {"name": "sq", "parameters": {"type": self.quantization_type.value}}}
if self.use_quant
else {}
),
},
}

if self.use_quant:
method_config["parameters"]["encoder"] = {
"name": "sq",
"parameters": {"type": self.quantization_type.value},
}

return method_config

def search_param(self) -> dict:
return {"ef_search": self.efSearch}
42 changes: 36 additions & 6 deletions vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ def build_knn_query(
if filter_clause:
knn_config["filter"] = filter_clause

if self.case_config.use_quant:
# Handle rescoring for both in-memory quantization and disk-based modes
if self.case_config.use_quant or self.case_config.on_disk:
knn_config["rescore"] = {"oversample_factor": self.case_config.oversample_factor}

return {"size": k, "query": {"knn": {self.vector_col_name: knn_config}}}
Expand Down Expand Up @@ -275,11 +276,44 @@ def _get_version_specific_settings(self, cluster_version: Version) -> dict:
version_specific_settings[name] = value
return version_specific_settings

def _build_vector_field_mapping(self) -> dict[str, Any]:
"""Build vector field mapping configuration based on storage mode."""
vector_field = {
"type": "knn_vector",
"dimension": self.dim,
"method": self.case_config.index_param(),
}

if self.case_config.on_disk:
vector_field.update(
{
"space_type": self.case_config.parse_metric(),
"data_type": "float",
"mode": "on_disk",
"compression_level": self.case_config.compression_level,
}
)
log.info(
f"Creating disk-based index - "
f"compression_level: {self.case_config.compression_level}, "
f"resolved_engine: {self.case_config.resolved_engine.value}"
)
else:
log.info(f"Creating in-memory index with engine: {self.case_config.engine.value}")

return vector_field

def _get_bulk_manager(self, client: OpenSearch) -> BulkInsertManager:
"""Get bulk insert manager for the given client."""
return BulkInsertManager(client, self.index_name, self.case_config)

def _create_index(self, client: OpenSearch) -> None:
cluster_version = self._get_cluster_version(client)

if self.case_config.on_disk and cluster_version < Version("2.17"):
error_msg = f"Disk-based vector search requires OpenSearch 2.17+, but cluster is running {cluster_version}"
raise OpenSearchError(error_msg)

ef_search_value = self.case_config.efSearch
log.info(f"Creating index with ef_search: {ef_search_value}")
log.info(f"Creating index with number_of_replicas: {self.case_config.number_of_replicas}")
Expand Down Expand Up @@ -324,11 +358,7 @@ def _create_index(self, client: OpenSearch) -> None:
properties[self.id_col_name] = {"type": "integer", "store": True}

properties[self.label_col_name] = {"type": "keyword"}
properties[self.vector_col_name] = {
"type": "knn_vector",
"dimension": self.dim,
"method": self.case_config.index_param(),
}
properties[self.vector_col_name] = self._build_vector_field_mapping()

mappings = {
"properties": properties,
Expand Down
102 changes: 100 additions & 2 deletions vectordb_bench/frontend/config/dbCaseConfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,65 @@ class CaseConfigInput(BaseModel):
isDisplayed=lambda config: (config.get(CaseConfigParamType.engine_name, "").lower() == "faiss"),
)

CaseConfigParamInput_ON_DISK_OSSOpensearch = CaseConfigInput(
label=CaseConfigParamType.on_disk,
displayLabel="Disk-based Search",
inputHelp="Enable disk-based storage with Binary Quantization",
inputType=InputType.Bool,
inputConfig={
"value": False,
},
)

CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch = CaseConfigInput(
label=CaseConfigParamType.compression_level,
displayLabel="Compression Level",
inputHelp="Binary quantization compression ratio for disk storage",
inputType=InputType.Option,
inputConfig={
"options": ["32x", "16x", "8x", "4x", "2x", "1x"],
"default": "32x",
},
isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == True,
)

CaseConfigParamInput_OVERSAMPLE_FACTOR_OSSOpensearch = CaseConfigInput(
label=CaseConfigParamType.oversample_factor,
displayLabel="Oversample Factor",
inputHelp="Rescoring oversample factor for two-phase search",
inputType=InputType.Float,
inputConfig={
"min": 1.0,
"max": 10.0,
"value": 3.0,
"step": 0.5,
},
isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == True,
)

CaseConfigParamInput_ENGINE_NAME_OSSOpensearch = CaseConfigInput(
label=CaseConfigParamType.engine_name,
displayLabel="Engine",
inputHelp="HNSW algorithm implementation to use",
inputType=InputType.Option,
inputConfig={
"options": ["faiss", "lucene"],
"default": "faiss",
},
isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == False,
)

CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch = CaseConfigInput(
label=CaseConfigParamType.quantizationType,
displayLabel="Quantization Type",
inputHelp="Scalar quantization type for in-memory vectors",
inputType=InputType.Option,
inputConfig={
"options": ["fp32", "fp16"],
"default": "fp32",
},
isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == False,
)
MilvusLoadConfig = [
CaseConfigParamInput_IndexType,
CaseConfigParamInput_M,
Expand Down Expand Up @@ -2356,6 +2415,45 @@ class CaseConfigInput(BaseModel):
CaseConfigParamInput_INDEX_THREAD_QTY_DURING_FORCE_MERGE_AWSOpensearch,
]


OSSOpensearchLoadingConfig = [
CaseConfigParamInput_ON_DISK_OSSOpensearch,
CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch,
CaseConfigParamInput_ENGINE_NAME_OSSOpensearch,
CaseConfigParamInput_METRIC_TYPE_NAME_AWSOpensearch,
CaseConfigParamInput_M_AWSOpensearch,
CaseConfigParamInput_EFConstruction_AWSOpensearch,
CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch,
CaseConfigParamInput_REFRESH_INTERVAL_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_SHARDS_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_REPLICAS_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_INDEXING_CLIENTS_AWSOpensearch,
CaseConfigParamInput_INDEX_THREAD_QTY_AWSOpensearch,
CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch,
CaseConfigParamInput_KNN_DERIVED_SOURCE_ENABLED_AWSOpensearch,
CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch,
]

OSSOpenSearchPerformanceConfig = [
CaseConfigParamInput_ON_DISK_OSSOpensearch,
CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch,
CaseConfigParamInput_OVERSAMPLE_FACTOR_OSSOpensearch,
CaseConfigParamInput_EF_SEARCH_AWSOpensearch,
CaseConfigParamInput_ENGINE_NAME_OSSOpensearch,
CaseConfigParamInput_METRIC_TYPE_NAME_AWSOpensearch,
CaseConfigParamInput_M_AWSOpensearch,
CaseConfigParamInput_EFConstruction_AWSOpensearch,
CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch,
CaseConfigParamInput_REFRESH_INTERVAL_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_SHARDS_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_REPLICAS_AWSOpensearch,
CaseConfigParamInput_NUMBER_OF_INDEXING_CLIENTS_AWSOpensearch,
CaseConfigParamInput_INDEX_THREAD_QTY_AWSOpensearch,
CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch,
CaseConfigParamInput_KNN_DERIVED_SOURCE_ENABLED_AWSOpensearch,
CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch,
]

# Map DB to config
CASE_CONFIG_MAP = {
DB.Milvus: {
Expand All @@ -2379,8 +2477,8 @@ class CaseConfigInput(BaseModel):
CaseLabel.Performance: AWSOpenSearchPerformanceConfig,
},
DB.OSSOpenSearch: {
CaseLabel.Load: AWSOpensearchLoadingConfig,
CaseLabel.Performance: AWSOpenSearchPerformanceConfig,
CaseLabel.Load: OSSOpensearchLoadingConfig,
CaseLabel.Performance: OSSOpenSearchPerformanceConfig,
},
DB.PgVector: {
CaseLabel.Load: PgVectorLoadingConfig,
Expand Down
3 changes: 3 additions & 0 deletions vectordb_bench/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class CaseConfigParamType(Enum):
replication_type = "replication_type"
knn_derived_source_enabled = "knn_derived_source_enabled"
memory_optimized_search = "memory_optimized_search"
on_disk = "on_disk"
compression_level = "compression_level"
oversample_factor = "oversample_factor"

# CockroachDB parameters
min_partition_size = "min_partition_size"
Expand Down