diff --git a/vectordb_bench/backend/clients/oss_opensearch/config.py b/vectordb_bench/backend/clients/oss_opensearch/config.py index 3f961bf09..04dc69f49 100644 --- a/vectordb_bench/backend/clients/oss_opensearch/config.py +++ b/vectordb_bench/backend/clients/oss_opensearch/config.py @@ -55,6 +55,30 @@ class OSSOpenSearchQuantization(Enum): fp16 = "fp16" +# Compression level constants for disk-based mode +class CompressionLevel: + """Valid compression levels for disk-based vector search""" + + LEVEL_1X = "1x" + LEVEL_2X = "2x" + LEVEL_4X = "4x" + LEVEL_8X = "8x" + LEVEL_16X = "16x" + LEVEL_32X = "32x" + + ALL = [LEVEL_1X, LEVEL_2X, LEVEL_4X, LEVEL_8X, LEVEL_16X, LEVEL_32X] + + # Lucene: 1x, 4x | FAISS: 2x, 8x, 16x, 32x + ENGINE_MAP = { + LEVEL_1X: OSSOS_Engine.lucene, + LEVEL_2X: OSSOS_Engine.faiss, + LEVEL_4X: OSSOS_Engine.lucene, + LEVEL_8X: OSSOS_Engine.faiss, + LEVEL_16X: OSSOS_Engine.faiss, + LEVEL_32X: OSSOS_Engine.faiss, + } + + class OSSOpenSearchIndexConfig(BaseModel, DBCaseConfig): metric_type: MetricType = MetricType.L2 engine: OSSOS_Engine = OSSOS_Engine.faiss @@ -74,11 +98,13 @@ class OSSOpenSearchIndexConfig(BaseModel, DBCaseConfig): cb_threshold: str | None = "50%" number_of_indexing_clients: int | None = 1 use_routing: bool = False # for label-filter cases - oversample_factor: float = 1.0 quantization_type: OSSOpenSearchQuantization = OSSOpenSearchQuantization.fp32 replication_type: str | None = "DOCUMENT" knn_derived_source_enabled: bool = False memory_optimized_search: bool = False + on_disk: bool = False + compression_level: str = CompressionLevel.LEVEL_32X + oversample_factor: float = 1.0 @root_validator def validate_engine_name(cls, values: dict): @@ -107,6 +133,9 @@ def __eq__(self, obj: any): and self.replication_type == obj.replication_type and self.knn_derived_source_enabled == obj.knn_derived_source_enabled and self.memory_optimized_search == obj.memory_optimized_search + and self.on_disk == obj.on_disk + and self.compression_level == obj.compression_level + and self.oversample_factor == obj.oversample_factor ) def __hash__(self) -> int: @@ -123,6 +152,9 @@ def __hash__(self) -> int: self.replication_type, self.knn_derived_source_enabled, self.memory_optimized_search, + self.on_disk, + self.compression_level, + self.oversample_factor, ) ) @@ -140,27 +172,48 @@ def parse_metric(self) -> str: @property def use_quant(self) -> bool: - return self.quantization_type is not OSSOpenSearchQuantization.fp32 + """Only use in-memory quantization when NOT in disk mode""" + return not self.on_disk and self.quantization_type is not OSSOpenSearchQuantization.fp32 + + @property + def resolved_engine(self) -> OSSOS_Engine: + """Return engine based on mode: auto-selected for disk, configured for in-memory.""" + if self.on_disk: + return CompressionLevel.ENGINE_MAP.get(self.compression_level, OSSOS_Engine.faiss) + return self.engine def index_param(self) -> dict: - log.info(f"Using engine: {self.engine} for index creation") - log.info(f"Using metric_type: {self.metric_type_name} for index creation") - log.info(f"Resulting space_type: {self.parse_metric()} for index creation") + resolved_engine = self.resolved_engine + space_type = self.parse_metric() + + log.info( + f"Index configuration - " + f"mode: {'disk' if self.on_disk else 'in-memory'}, " + f"configured_engine: {self.engine.value}, " + f"resolved_engine: {resolved_engine.value}, " + f"metric_type: {self.metric_type_name}, " + f"space_type: {space_type}" + f"{', ' if self.on_disk else ''}" + f"{'compression_level: ' + self.compression_level if self.on_disk else ''}" + ) - return { + method_config = { "name": "hnsw", - "engine": self.engine.value, - "space_type": self.parse_metric(), + "engine": resolved_engine.value, + "space_type": space_type, "parameters": { "ef_construction": self.efConstruction, "m": self.M, - **( - {"encoder": {"name": "sq", "parameters": {"type": self.quantization_type.value}}} - if self.use_quant - else {} - ), }, } + if self.use_quant: + method_config["parameters"]["encoder"] = { + "name": "sq", + "parameters": {"type": self.quantization_type.value}, + } + + return method_config + def search_param(self) -> dict: return {"ef_search": self.efSearch} diff --git a/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py b/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py index a790834de..f71850a17 100644 --- a/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py +++ b/vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py @@ -153,7 +153,8 @@ def build_knn_query( if filter_clause: knn_config["filter"] = filter_clause - if self.case_config.use_quant: + # Handle rescoring for both in-memory quantization and disk-based modes + if self.case_config.use_quant or self.case_config.on_disk: knn_config["rescore"] = {"oversample_factor": self.case_config.oversample_factor} return {"size": k, "query": {"knn": {self.vector_col_name: knn_config}}} @@ -275,11 +276,44 @@ def _get_version_specific_settings(self, cluster_version: Version) -> dict: version_specific_settings[name] = value return version_specific_settings + def _build_vector_field_mapping(self) -> dict[str, Any]: + """Build vector field mapping configuration based on storage mode.""" + vector_field = { + "type": "knn_vector", + "dimension": self.dim, + "method": self.case_config.index_param(), + } + + if self.case_config.on_disk: + vector_field.update( + { + "space_type": self.case_config.parse_metric(), + "data_type": "float", + "mode": "on_disk", + "compression_level": self.case_config.compression_level, + } + ) + log.info( + f"Creating disk-based index - " + f"compression_level: {self.case_config.compression_level}, " + f"resolved_engine: {self.case_config.resolved_engine.value}" + ) + else: + log.info(f"Creating in-memory index with engine: {self.case_config.engine.value}") + + return vector_field + def _get_bulk_manager(self, client: OpenSearch) -> BulkInsertManager: """Get bulk insert manager for the given client.""" return BulkInsertManager(client, self.index_name, self.case_config) def _create_index(self, client: OpenSearch) -> None: + cluster_version = self._get_cluster_version(client) + + if self.case_config.on_disk and cluster_version < Version("2.17"): + error_msg = f"Disk-based vector search requires OpenSearch 2.17+, but cluster is running {cluster_version}" + raise OpenSearchError(error_msg) + ef_search_value = self.case_config.efSearch log.info(f"Creating index with ef_search: {ef_search_value}") log.info(f"Creating index with number_of_replicas: {self.case_config.number_of_replicas}") @@ -324,11 +358,7 @@ def _create_index(self, client: OpenSearch) -> None: properties[self.id_col_name] = {"type": "integer", "store": True} properties[self.label_col_name] = {"type": "keyword"} - properties[self.vector_col_name] = { - "type": "knn_vector", - "dimension": self.dim, - "method": self.case_config.index_param(), - } + properties[self.vector_col_name] = self._build_vector_field_mapping() mappings = { "properties": properties, diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 9348c243e..b5c86a7f2 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -1903,6 +1903,65 @@ class CaseConfigInput(BaseModel): isDisplayed=lambda config: (config.get(CaseConfigParamType.engine_name, "").lower() == "faiss"), ) +CaseConfigParamInput_ON_DISK_OSSOpensearch = CaseConfigInput( + label=CaseConfigParamType.on_disk, + displayLabel="Disk-based Search", + inputHelp="Enable disk-based storage with Binary Quantization", + inputType=InputType.Bool, + inputConfig={ + "value": False, + }, +) + +CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch = CaseConfigInput( + label=CaseConfigParamType.compression_level, + displayLabel="Compression Level", + inputHelp="Binary quantization compression ratio for disk storage", + inputType=InputType.Option, + inputConfig={ + "options": ["32x", "16x", "8x", "4x", "2x", "1x"], + "default": "32x", + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == True, +) + +CaseConfigParamInput_OVERSAMPLE_FACTOR_OSSOpensearch = CaseConfigInput( + label=CaseConfigParamType.oversample_factor, + displayLabel="Oversample Factor", + inputHelp="Rescoring oversample factor for two-phase search", + inputType=InputType.Float, + inputConfig={ + "min": 1.0, + "max": 10.0, + "value": 3.0, + "step": 0.5, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == True, +) + +CaseConfigParamInput_ENGINE_NAME_OSSOpensearch = CaseConfigInput( + label=CaseConfigParamType.engine_name, + displayLabel="Engine", + inputHelp="HNSW algorithm implementation to use", + inputType=InputType.Option, + inputConfig={ + "options": ["faiss", "lucene"], + "default": "faiss", + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == False, +) + +CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch = CaseConfigInput( + label=CaseConfigParamType.quantizationType, + displayLabel="Quantization Type", + inputHelp="Scalar quantization type for in-memory vectors", + inputType=InputType.Option, + inputConfig={ + "options": ["fp32", "fp16"], + "default": "fp32", + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.on_disk, False) == False, +) MilvusLoadConfig = [ CaseConfigParamInput_IndexType, CaseConfigParamInput_M, @@ -2356,6 +2415,45 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_INDEX_THREAD_QTY_DURING_FORCE_MERGE_AWSOpensearch, ] + +OSSOpensearchLoadingConfig = [ + CaseConfigParamInput_ON_DISK_OSSOpensearch, + CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch, + CaseConfigParamInput_ENGINE_NAME_OSSOpensearch, + CaseConfigParamInput_METRIC_TYPE_NAME_AWSOpensearch, + CaseConfigParamInput_M_AWSOpensearch, + CaseConfigParamInput_EFConstruction_AWSOpensearch, + CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch, + CaseConfigParamInput_REFRESH_INTERVAL_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_SHARDS_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_REPLICAS_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_INDEXING_CLIENTS_AWSOpensearch, + CaseConfigParamInput_INDEX_THREAD_QTY_AWSOpensearch, + CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch, + CaseConfigParamInput_KNN_DERIVED_SOURCE_ENABLED_AWSOpensearch, + CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch, +] + +OSSOpenSearchPerformanceConfig = [ + CaseConfigParamInput_ON_DISK_OSSOpensearch, + CaseConfigParamInput_COMPRESSION_LEVEL_OSSOpensearch, + CaseConfigParamInput_OVERSAMPLE_FACTOR_OSSOpensearch, + CaseConfigParamInput_EF_SEARCH_AWSOpensearch, + CaseConfigParamInput_ENGINE_NAME_OSSOpensearch, + CaseConfigParamInput_METRIC_TYPE_NAME_AWSOpensearch, + CaseConfigParamInput_M_AWSOpensearch, + CaseConfigParamInput_EFConstruction_AWSOpensearch, + CaseConfigParamInput_QUANTIZATION_TYPE_OSSOpensearch, + CaseConfigParamInput_REFRESH_INTERVAL_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_SHARDS_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_REPLICAS_AWSOpensearch, + CaseConfigParamInput_NUMBER_OF_INDEXING_CLIENTS_AWSOpensearch, + CaseConfigParamInput_INDEX_THREAD_QTY_AWSOpensearch, + CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch, + CaseConfigParamInput_KNN_DERIVED_SOURCE_ENABLED_AWSOpensearch, + CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch, +] + # Map DB to config CASE_CONFIG_MAP = { DB.Milvus: { @@ -2379,8 +2477,8 @@ class CaseConfigInput(BaseModel): CaseLabel.Performance: AWSOpenSearchPerformanceConfig, }, DB.OSSOpenSearch: { - CaseLabel.Load: AWSOpensearchLoadingConfig, - CaseLabel.Performance: AWSOpenSearchPerformanceConfig, + CaseLabel.Load: OSSOpensearchLoadingConfig, + CaseLabel.Performance: OSSOpenSearchPerformanceConfig, }, DB.PgVector: { CaseLabel.Load: PgVectorLoadingConfig, diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index cce0fa116..338be2b98 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -129,6 +129,9 @@ class CaseConfigParamType(Enum): replication_type = "replication_type" knn_derived_source_enabled = "knn_derived_source_enabled" memory_optimized_search = "memory_optimized_search" + on_disk = "on_disk" + compression_level = "compression_level" + oversample_factor = "oversample_factor" # CockroachDB parameters min_partition_size = "min_partition_size"