From a54fabf83271d97744a46c22b7c595acda5bc138 Mon Sep 17 00:00:00 2001 From: Akhil-Pathivada Date: Tue, 2 Dec 2025 01:27:45 +0530 Subject: [PATCH] feat: Add Custom Dataset support for Streaming Tests --- vectordb_bench/backend/cases.py | 81 +++++++++++++++++++ vectordb_bench/custom/custom_case.json | 13 +++ .../custom/displayCustomStreamingCase.py | 49 +++++++++++ .../components/custom/getCustomConfig.py | 35 +++++++- .../components/run_test/caseSelector.py | 3 +- .../frontend/config/dbCaseConfigs.py | 34 ++++++++ vectordb_bench/frontend/pages/custom.py | 74 ++++++++++++++++- 7 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 vectordb_bench/frontend/components/custom/displayCustomStreamingCase.py diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py index 24eca2400..edfcdea19 100644 --- a/vectordb_bench/backend/cases.py +++ b/vectordb_bench/backend/cases.py @@ -51,6 +51,7 @@ class CaseType(Enum): PerformanceCustomDataset = 101 StreamingPerformanceCase = 200 + StreamingCustomDataset = 201 LabelFilterPerformanceCase = 300 @@ -474,6 +475,85 @@ def __init__( ) +class StreamingCustomDataset(Case): + case_id: CaseType = CaseType.StreamingCustomDataset + label: CaseLabel = CaseLabel.Streaming + name: str = "Streaming Performance With Custom Dataset" + description: str = "" + dataset: DatasetManager + insert_rate: int + search_stages: list[float] + concurrencies: list[int] + optimize_after_write: bool = True + read_dur_after_write: int = 30 + + def __init__( + self, + description: str, + dataset_config: dict, + insert_rate: int = 500, + search_stages: list[float] | str = (0.5, 0.8), + concurrencies: list[int] | str = (5, 10), + optimize_after_write: bool = True, + read_dur_after_write: int = 30, + **kwargs, + ): + num_per_batch = config.NUM_PER_BATCH + if insert_rate % config.NUM_PER_BATCH != 0: + _insert_rate = max( + num_per_batch, + insert_rate // num_per_batch * num_per_batch, + ) + log.warning( + f"[streaming_case init] insert_rate(={insert_rate}) should be " + f"divisible by NUM_PER_BATCH={num_per_batch}), reset to {_insert_rate}", + ) + insert_rate = _insert_rate + + dataset_config = CustomDatasetConfig(**dataset_config) + dataset = CustomDataset( + name=dataset_config.name, + size=dataset_config.size, + dim=dataset_config.dim, + metric_type=metric_type_map(dataset_config.metric_type), + use_shuffled=dataset_config.use_shuffled, + with_gt=dataset_config.with_gt, + dir=dataset_config.dir, + file_num=dataset_config.file_count, + train_file=dataset_config.train_name, + test_file=f"{dataset_config.test_name}.parquet", + train_id_field=dataset_config.train_id_name, + train_vector_field=dataset_config.train_col_name, + test_vector_field=dataset_config.test_col_name, + gt_neighbors_field=dataset_config.gt_col_name, + scalar_labels_file=f"{dataset_config.scalar_labels_name}.parquet", + ) + name = f"Streaming-Perf - Custom - {dataset_config.name}, {insert_rate} rows/s" + description = ( + description + if description + else f"This case tests the search performance of vector database while maintaining " + f"a fixed insertion speed. (dataset: Custom - {dataset_config.name})" + ) + + if isinstance(search_stages, str): + search_stages = json.loads(search_stages) + if isinstance(concurrencies, str): + concurrencies = json.loads(concurrencies) + + super().__init__( + name=name, + description=description, + dataset=DatasetManager(data=dataset), + insert_rate=insert_rate, + search_stages=search_stages, + concurrencies=concurrencies, + optimize_after_write=optimize_after_write, + read_dur_after_write=read_dur_after_write, + **kwargs, + ) + + class NewIntFilterPerformanceCase(PerformanceCase): case_id: CaseType = CaseType.NewIntFilterPerformanceCase dataset_with_size_type: DatasetWithSizeType @@ -572,6 +652,7 @@ def filters(self) -> Filter: CaseType.Performance1536D50K: Performance1536D50K, CaseType.PerformanceCustomDataset: PerformanceCustomDataset, CaseType.StreamingPerformanceCase: StreamingPerformanceCase, + CaseType.StreamingCustomDataset: StreamingCustomDataset, CaseType.NewIntFilterPerformanceCase: NewIntFilterPerformanceCase, CaseType.LabelFilterPerformanceCase: LabelFilterPerformanceCase, } diff --git a/vectordb_bench/custom/custom_case.json b/vectordb_bench/custom/custom_case.json index 48ca8d8c4..12ca6597b 100644 --- a/vectordb_bench/custom/custom_case.json +++ b/vectordb_bench/custom/custom_case.json @@ -14,5 +14,18 @@ "use_shuffled": false, "with_gt": true } + }, + { + "case_type": "streaming", + "description": "This is a custom streaming dataset.", + "dataset_config": { + "name": "My Streaming Dataset", + "dir": "/my_dataset_path", + "size": 1000000, + "dim": 1024, + "file_count": 1, + "train_name": "shuffle_train", + "with_gt": true + } } ] \ No newline at end of file diff --git a/vectordb_bench/frontend/components/custom/displayCustomStreamingCase.py b/vectordb_bench/frontend/components/custom/displayCustomStreamingCase.py new file mode 100644 index 000000000..e861ca1a6 --- /dev/null +++ b/vectordb_bench/frontend/components/custom/displayCustomStreamingCase.py @@ -0,0 +1,49 @@ +from vectordb_bench.frontend.components.custom.getCustomConfig import CustomStreamingCaseConfig + + +def displayCustomStreamingCase(streamingCase: CustomStreamingCaseConfig, st, key): + + columns = st.columns([1, 2]) + streamingCase.dataset_config.name = columns[0].text_input( + "Name", key=f"{key}_name", value=streamingCase.dataset_config.name + ) + streamingCase.dataset_config.dir = columns[1].text_input( + "Folder Path", key=f"{key}_dir", value=streamingCase.dataset_config.dir + ) + + columns = st.columns(2) + streamingCase.dataset_config.dim = columns[0].number_input( + "dim", key=f"{key}_dim", value=streamingCase.dataset_config.dim + ) + streamingCase.dataset_config.size = columns[1].number_input( + "size", key=f"{key}_size", value=streamingCase.dataset_config.size + ) + + columns = st.columns(3) + streamingCase.dataset_config.train_name = columns[0].text_input( + "train file name", + key=f"{key}_train_name", + value=streamingCase.dataset_config.train_name, + ) + streamingCase.dataset_config.test_name = columns[1].text_input( + "test file name", key=f"{key}_test_name", value=streamingCase.dataset_config.test_name + ) + streamingCase.dataset_config.gt_name = columns[2].text_input( + "ground truth file name", key=f"{key}_gt_name", value=streamingCase.dataset_config.gt_name + ) + + columns = st.columns([1, 1, 2, 2]) + streamingCase.dataset_config.train_id_name = columns[0].text_input( + "train id name", key=f"{key}_train_id_name", value=streamingCase.dataset_config.train_id_name + ) + streamingCase.dataset_config.train_col_name = columns[1].text_input( + "train emb name", key=f"{key}_train_col_name", value=streamingCase.dataset_config.train_col_name + ) + streamingCase.dataset_config.test_col_name = columns[2].text_input( + "test emb name", key=f"{key}_test_col_name", value=streamingCase.dataset_config.test_col_name + ) + streamingCase.dataset_config.gt_col_name = columns[3].text_input( + "ground truth emb name", key=f"{key}_gt_col_name", value=streamingCase.dataset_config.gt_col_name + ) + + streamingCase.description = st.text_area("description", key=f"{key}_description", value=streamingCase.description) diff --git a/vectordb_bench/frontend/components/custom/getCustomConfig.py b/vectordb_bench/frontend/components/custom/getCustomConfig.py index ef4feacaf..a1ddfb737 100644 --- a/vectordb_bench/frontend/components/custom/getCustomConfig.py +++ b/vectordb_bench/frontend/components/custom/getCustomConfig.py @@ -34,10 +34,30 @@ class CustomCaseConfig(BaseModel): dataset_config: CustomDatasetConfig = CustomDatasetConfig() +class CustomStreamingCaseConfig(BaseModel): + case_type: str = "streaming" + description: str = "" + dataset_config: CustomDatasetConfig = CustomDatasetConfig() + + def get_custom_configs(): with open(config.CUSTOM_CONFIG_DIR, "r") as f: custom_configs = json.load(f) - return [CustomCaseConfig(**custom_config) for custom_config in custom_configs] + return [ + CustomCaseConfig(**custom_config) + for custom_config in custom_configs + if custom_config.get("case_type") != "streaming" + ] + + +def get_custom_streaming_configs(): + with open(config.CUSTOM_CONFIG_DIR, "r") as f: + custom_configs = json.load(f) + return [ + CustomStreamingCaseConfig(**custom_config) + for custom_config in custom_configs + if custom_config.get("case_type") == "streaming" + ] def save_custom_configs(custom_configs: list[CustomDatasetConfig]): @@ -45,5 +65,18 @@ def save_custom_configs(custom_configs: list[CustomDatasetConfig]): json.dump([custom_config.dict() for custom_config in custom_configs], f, indent=4) +def save_all_custom_configs( + performance_configs: list[CustomCaseConfig], streaming_configs: list[CustomStreamingCaseConfig] +): + """Save both performance and streaming configs to the same JSON file""" + all_configs = [config.dict() for config in performance_configs] + [config.dict() for config in streaming_configs] + with open(config.CUSTOM_CONFIG_DIR, "w") as f: + json.dump(all_configs, f, indent=4) + + def generate_custom_case(): return CustomCaseConfig() + + +def generate_custom_streaming_case(): + return CustomStreamingCaseConfig() diff --git a/vectordb_bench/frontend/components/run_test/caseSelector.py b/vectordb_bench/frontend/components/run_test/caseSelector.py index d9b4b8636..2e104ce54 100644 --- a/vectordb_bench/frontend/components/run_test/caseSelector.py +++ b/vectordb_bench/frontend/components/run_test/caseSelector.py @@ -7,6 +7,7 @@ UICaseItemCluster, get_case_config_inputs, get_custom_case_cluter, + get_custom_streaming_case_cluster, ) from vectordb_bench.frontend.config.styles import ( CASE_CONFIG_SETTING_COLUMNS, @@ -32,7 +33,7 @@ def caseSelector(st, activedDbList: list[DB]): activedCaseList: list[CaseConfig] = [] dbToCaseClusterConfigs = defaultdict(lambda: defaultdict(dict)) dbToCaseConfigs = defaultdict(lambda: defaultdict(dict)) - caseClusters = UI_CASE_CLUSTERS + [get_custom_case_cluter()] + caseClusters = UI_CASE_CLUSTERS + [get_custom_case_cluter(), get_custom_streaming_case_cluster()] for caseCluster in caseClusters: activedCaseList += caseClusterExpander(st, caseCluster, dbToCaseClusterConfigs, activedDbList) for db in dbToCaseClusterConfigs: diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 6dd8a6e19..3d3dc1d12 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -161,6 +161,34 @@ def get_custom_case_cluter() -> UICaseItemCluster: return UICaseItemCluster(label="Custom Search Performance Test", uiCaseItems=get_custom_case_items()) +def get_custom_streaming_case_items() -> list[UICaseItem]: + from vectordb_bench.frontend.components.custom.getCustomConfig import get_custom_streaming_configs + + custom_streaming_configs = get_custom_streaming_configs() + return [ + UICaseItem( + label=f"{custom_config.dataset_config.name} - Streaming", + description=f"Streaming test with custom dataset: {custom_config.dataset_config.name}", + cases=[ + CaseConfig( + case_id=CaseType.StreamingCustomDataset, + custom_case={ + "description": custom_config.description, + "dataset_config": custom_config.dataset_config.dict(), + }, + ) + ], + caseLabel=CaseLabel.Streaming, + extra_custom_case_config_inputs=custom_streaming_config_with_custom_dataset, + ) + for custom_config in custom_streaming_configs + ] + + +def get_custom_streaming_case_cluster() -> UICaseItemCluster: + return UICaseItemCluster(label="Custom Streaming Test", uiCaseItems=get_custom_streaming_case_items()) + + def generate_custom_streaming_case() -> CaseConfig: return CaseConfig( case_id=CaseType.StreamingPerformanceCase, @@ -207,6 +235,12 @@ def generate_custom_streaming_case() -> CaseConfig: ), ] +# Config for custom streaming tests (with custom dataset from JSON) +# Filter out the dataset_with_size_type from the existing config +custom_streaming_config_with_custom_dataset: list[ConfigInput] = [ + config for config in custom_streaming_config if config.label != CaseConfigParamType.dataset_with_size_type +] + def generate_label_filter_cases(dataset_with_size_type: DatasetWithSizeType) -> list[CaseConfig]: label_percentages = dataset_with_size_type.get_manager().data.scalar_label_percentages diff --git a/vectordb_bench/frontend/pages/custom.py b/vectordb_bench/frontend/pages/custom.py index 3595d71e7..6f1235fb0 100644 --- a/vectordb_bench/frontend/pages/custom.py +++ b/vectordb_bench/frontend/pages/custom.py @@ -5,12 +5,19 @@ from vectordb_bench.frontend.components.custom.displayCustomCase import ( displayCustomCase, ) +from vectordb_bench.frontend.components.custom.displayCustomStreamingCase import ( + displayCustomStreamingCase, +) from vectordb_bench.frontend.components.custom.displaypPrams import displayParams from vectordb_bench.frontend.components.custom.getCustomConfig import ( CustomCaseConfig, + CustomStreamingCaseConfig, generate_custom_case, + generate_custom_streaming_case, get_custom_configs, + get_custom_streaming_configs, save_custom_configs, + save_all_custom_configs, ) from vectordb_bench.frontend.components.custom.initStyle import initStyle from vectordb_bench.frontend.config.styles import FAVICON, PAGE_TITLE @@ -33,7 +40,33 @@ def deleteCase(self, idx: int): self.save() def save(self): - save_custom_configs(self.customCaseItems) + # Save performance configs along with existing streaming configs + streaming_configs = get_custom_streaming_configs() + save_all_custom_configs(self.customCaseItems, streaming_configs) + + +class StreamingCaseManager: + streamingCaseItems: list[CustomStreamingCaseConfig] + + def __init__(self): + self.streamingCaseItems = get_custom_streaming_configs() + + def addCase(self): + new_streaming_case = generate_custom_streaming_case() + new_streaming_case.dataset_config.name = ( + f"{new_streaming_case.dataset_config.name} {len(self.streamingCaseItems)}" + ) + self.streamingCaseItems += [new_streaming_case] + self.save() + + def deleteCase(self, idx: int): + self.streamingCaseItems.pop(idx) + self.save() + + def save(self): + # Save streaming configs along with existing performance configs + performance_configs = get_custom_configs() + save_all_custom_configs(performance_configs, self.streamingCaseItems) def main(): @@ -55,6 +88,11 @@ def main(): st.title("Custom Dataset") displayParams(st) + + # Performance Test Datasets Section + st.subheader("Performance Test Datasets") + st.markdown("These datasets are used for search performance tests.") + customCaseManager = CustomCaseManager() for idx, customCase in enumerate(customCaseManager.customCaseItems): @@ -84,6 +122,40 @@ def main(): on_click=lambda: customCaseManager.addCase(), ) + st.divider() + + # Streaming Test Datasets Section + st.subheader("Streaming Test Datasets") + st.markdown("These datasets are used for streaming performance tests (insertion + search).") + + streamingCaseManager = StreamingCaseManager() + + for idx, streamingCase in enumerate(streamingCaseManager.streamingCaseItems): + expander = st.expander(streamingCase.dataset_config.name, expanded=True) + key = f"streaming_case_{idx}" + displayCustomStreamingCase(streamingCase, expander, key=key) + + columns = expander.columns(8) + columns[0].button( + "Save", + key=f"{key}_save", + type="secondary", + on_click=lambda: streamingCaseManager.save(), + ) + columns[1].button( + ":red[Delete]", + key=f"{key}_delete", + type="secondary", + on_click=partial(lambda idx: streamingCaseManager.deleteCase(idx), idx=idx), + ) + + st.button( + "+ New Streaming Dataset", + key="add_streaming_config", + type="primary", + on_click=lambda: streamingCaseManager.addCase(), + ) + if __name__ == "__main__": main()