diff --git a/README.md b/README.md index b779af11f..15767a980 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ All the database client supported | pgvector | `pip install vectordb-bench[pgvector]` | | pgvecto.rs | `pip install vectordb-bench[pgvecto_rs]` | | pgvectorscale | `pip install vectordb-bench[pgvectorscale]` | +| pgdiskann | `pip install vectordb-bench[pgdiskann]` | | redis | `pip install vectordb-bench[redis]` | | memorydb | `pip install vectordb-bench[memorydb]` | | chromadb | `pip install vectordb-bench[chromadb]` | diff --git a/compute_gt.py b/compute_gt.py new file mode 100644 index 000000000..4d7d01dd0 --- /dev/null +++ b/compute_gt.py @@ -0,0 +1,77 @@ +import argparse +import numpy as np +import pandas as pd +import pyarrow as pa +import psycopg +import psycopg.sql as sql +import pyarrow.parquet as pq +from pgvector.psycopg import register_vector + + +def query_database(query, emb, k, cursor): + result = cursor.execute(query, (emb, k), prepare=True, binary=True) + return [int(i[0]) for i in result.fetchall()] + +def write_parquet_file(data, file_path): + df = pd.DataFrame(data, columns=["id", "neighbors_id"]) + table = pa.Table.from_pandas(df) + pq.write_table(table, file_path, use_dictionary=False) + +def main(): + parser = argparse.ArgumentParser(description="Create subsets of Parquet files using Dask.") + parser.add_argument("--test-file-path", type=str, help="Path of the parquet test file.") + parser.add_argument("--gt-file-path", type=str, help="Parquet file path where ground truth will be saved.") + parser.add_argument("--table-name", type=str, help="Vector table name") + parser.add_argument("--k", type=str, help="K nearest neighbors") + parser.add_argument("--db-name", type=str, help="Database name") + parser.add_argument("--db-user", type=str, help="Database user") + parser.add_argument("--db-pass", type=str, help="Database password") + parser.add_argument("--host", type=str, help="Database host") + parser.add_argument("--port", type=str, help="Database port") + args = parser.parse_args() + + ''' + vectordbbench pgvectorhnsw --drop-old --load --skip-search-serial --skip-search-concurrent --case-type PerformanceCustomDataset --user-name postgres --password admin123 --host 172.17.0.2 --db-name postgres --maintenance-work-mem 4GB --max-parallel-workers 3 --ef-search 100 --ef-construction 64 --m 16 --k 10 --num-concurrency 1,5,10,15,20,25,30 --custom-case-name "Computing GT for 500k openai" --custom-dataset-name custom-openai --custom-dataset-dir subset_500k --custom-dataset-size 500000 --custom-dataset-dim 1536 --custom-dataset-file-count 10 --custom-dataset-use-shuffled + ''' + + df = pd.read_parquet(args.test_file_path) + try: + connection = psycopg.connect( + dbname=args.db_name, + user=args.db_user, + password=args.db_pass, + host=args.host, + port=args.port, + ) + register_vector(connection) + print("Connection established.") + + results = [] + count = 0 + for _, row in df.iterrows(): + q = np.asarray(row["emb"]) + query = sql.Composed( + [ + sql.SQL("SELECT id FROM public.{} ORDER BY embedding <=> ").format( + sql.Identifier(args.table_name) + ), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + result = (row["id"], np.asarray(query_database(query, q, args.k, connection))) + results.append(result) + count += 1 + if count%10 == 0: + print(f"GT computed for {count} rows.") + connection.close() + + write_parquet_file(results, args.gt_file_path) + print("Ground truth calculated and saved.") + except: + print("Connection failed.") + connection.close() + finally: + connection.close() + +if __name__ == "__main__": + main() diff --git a/config-diskann-filter.json b/config-diskann-filter.json new file mode 100644 index 000000000..bab98c493 --- /dev/null +++ b/config-diskann-filter.json @@ -0,0 +1,65 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann-filter", + "instance_type": "Standard_D8ds_v5", + "provider": "azure", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "diskann-no-filter", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "max-neighbors": 64, + "l-value-ib": 128, + "l-value-is": [32], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + }, + { + "db-label": "diskann-filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "max-neighbors": 64, + "l-value-ib": 128, + "l-value-is": [32], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + }, + { + "db-label": "diskann-filter-low", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "max-neighbors": 64, + "l-value-ib": 128, + "l-value-is": [32], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + } + ] + } + \ No newline at end of file diff --git a/config-hnsw-filter.json b/config-hnsw-filter.json new file mode 100644 index 000000000..1f96d21b7 --- /dev/null +++ b/config-hnsw-filter.json @@ -0,0 +1,66 @@ + +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann-filter", + "instance_type": "Standard_D8ds_v5", + "provider": "azure", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "hnsw-no-filter", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "ef-search": [40], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + }, + { + "db-label": "hnsw-no-filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "ef-search": [40], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + }, + { + "db-label": "hnsw-filter-low", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "ef-search": [40], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 2 + } + ] + } + \ No newline at end of file diff --git a/create_dataset_subsets.py b/create_dataset_subsets.py new file mode 100644 index 000000000..efb1d3c82 --- /dev/null +++ b/create_dataset_subsets.py @@ -0,0 +1,54 @@ +import os +import shutil +import argparse + + +def get_file_name(file_name: str, file_prefix: str, files_count: int) -> str: + if files_count == 1: + return file_prefix + ".parquet" + file_name = file_name.split("of-")[0] + return file_name + "of-" + str(files_count).zfill(2) + ".parquet" + +def create_subsets(base_dir, save_dir_path, subset_prefix, file_prefix, step_size): + files = sorted([f for f in os.listdir(base_dir) if f.startswith(file_prefix)]) + num_files = len(files) + + for i in range(1, num_files + 1): + subset_dir = os.path.join(save_dir_path, f"{subset_prefix}_{i * step_size // 1000}k") + os.makedirs(subset_dir, exist_ok=True) + + for j in range(i): + src_file = os.path.join(base_dir, files[j]) + dst_file = os.path.join(subset_dir, get_file_name(files[j], file_prefix, i)) + shutil.copy(src_file, dst_file) + src_test_file = os.path.join(base_dir, "test.parquet") + dst_test_file = os.path.join(subset_dir, "test.parquet") + shutil.copy(src_test_file, dst_test_file) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create subsets of Parquet files using Dask.") + parser.add_argument("--directory", type=str, help="Path to the directory containing Parquet files.") + parser.add_argument("--save-dir-path", type=str, help="Directory path where data will be saved") + parser.add_argument("--dataset-name-prefix", type=str, help="Name prefix of the folder where each subset will be saved.") + parser.add_argument("--is-shuffled", type=bool, help="Whether the files are shuffled or not.") + args = parser.parse_args() + + file_prefix = ( + "shuffle_train" + if args.is_shuffled + else "train" + ) + subset_prefix = ( + args.dataset_name_prefix + if args.dataset_name_prefix + else "openai" + ) + save_dir_path = ( + args.save_dir_path + if args.save_dir_path + else args.directory + ) + step_size = 500_000 # 500k + + create_subsets(args.directory, save_dir_path, subset_prefix, file_prefix, step_size) + print(f'Finished creating subsets of Parquet files in {args.directory}.') \ No newline at end of file diff --git a/generate_test_dataset.py b/generate_test_dataset.py new file mode 100644 index 000000000..d00519294 --- /dev/null +++ b/generate_test_dataset.py @@ -0,0 +1,78 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import pandas as pd +import numpy as np +import glob +import gc +import argparse +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Parse command line arguments +parser = argparse.ArgumentParser(description='Generate test dataset from Parquet files.') +parser.add_argument('--folder-path', type=str, required=True, help='Path to the folder containing the train files') +parser.add_argument('--file-pattern', type=str, required=False, help='Pattern to match the train files') +parser.add_argument('--queries-per-file', type=int, required=False, help='Number of queries to sample per file') +parser.add_argument('--output-file', type=str, required=False, help='Output Parquet file name') + +args = parser.parse_args() + +folder_path = args.folder_path +file_pattern = ( + args.file_pattern + if args.file_pattern + else "train-*-of-10.parquet" +) +queries_per_file = ( + args.queries_per_file + if args.queries_per_file + else 1000 +) +output_file = ( + args.output_file + if args.output_file + else 'test-10000.parquet' +) + +logging.info(f"Folder path: {folder_path}") +logging.info(f"File pattern: {file_pattern}") +logging.info(f"Queries per file: {queries_per_file}") +logging.info(f"Output file: {output_file}") + +schema = pa.schema([ + pa.field('id', pa.int64()), + pa.field('emb', pa.list_(pa.float64())) # emb as a list of double precision floats +]) + +# Open Parquet writer with the specified schema +with pq.ParquetWriter(output_file, schema) as writer: + # Track the current ID incrementally + current_id = 0 + + # Loop through each file and sample specified queries + for file_path in glob.glob(folder_path + file_pattern): + logging.info(f"Processing file: {file_path}") + + # Load and sample data + train_table = pq.read_table(file_path, columns=['id', 'emb']) + train_df = train_table.to_pandas() + sampled_df = train_df.sample(n=queries_per_file).reset_index(drop=True) + + # Reassign IDs starting from the current count + sampled_df['id'] = np.arange(current_id, current_id + queries_per_file) + current_id += queries_per_file + + # Convert the DataFrame to a PyArrow table with the specified schema + sampled_table = pa.Table.from_pandas(sampled_df, schema=schema) + + # Write the sampled data to the output file + writer.write_table(sampled_table) + + # Clean up to free memory + del train_table, train_df, sampled_df, sampled_table + gc.collect() + + logging.info(f"Finished processing file: {file_path}") + +logging.info(f"{output_file} has been created with {queries_per_file} randomly sampled embeddings per file.") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 015ed8c3f..000800389 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ weaviate = [ "weaviate-client" ] elastic = [ "elasticsearch" ] pgvector = [ "psycopg", "psycopg-binary", "pgvector" ] pgvectorscale = [ "psycopg", "psycopg-binary", "pgvector" ] +pgdiskann = [ "psycopg", "psycopg-binary", "pgvector" ] pgvecto_rs = [ "pgvecto_rs[psycopg3]>=0.2.2" ] redis = [ "redis" ] memorydb = [ "memorydb" ] diff --git a/queries.json b/queries.json new file mode 100644 index 000000000..560f1f7e8 --- /dev/null +++ b/queries.json @@ -0,0 +1,38 @@ +[ + { + "description": "Hit Ratio, Reads, Writes from pg_stat_io", + "query": "SELECT (hits / (reads + hits)::float) AS hit_ratio, reads, writes FROM pg_stat_io WHERE backend_type = 'client backend' AND context = 'normal' AND object = 'relation';" + }, + { + "description": "Buffer Usage from pg_buffercache", + "query": "WITH state AS (SELECT count(*) FILTER (WHERE relfilenode IS NOT NULL) AS used, count(*) FILTER (WHERE relfilenode IS NULL) AS empty, count(*) AS total FROM pg_buffercache) SELECT *, round(used * 1.0 / total * 100, 1) AS percent FROM state;" + }, + { + "description": "Hit Ratio for Tables from pg_statio_user_tables", + "query": "SELECT SUM(heap_blks_read) as heap_read, SUM(heap_blks_hit) as heap_hit, SUM(heap_blks_hit) / (SUM(heap_blks_hit) + SUM(heap_blks_read)) as hit_ratio FROM pg_statio_user_tables;" + }, + { + "description": "Hit Ratio for Indexes from pg_statio_user_indexes", + "query": "SELECT SUM(idx_blks_read) as idx_read, SUM(idx_blks_hit) as idx_hit, (SUM(idx_blks_hit) - SUM(idx_blks_read)) / SUM(idx_blks_hit) as ratio FROM pg_statio_user_indexes;" + }, + { + "description": "Index Hit Ratio with Table and Index Names", + "query": "SELECT t.schemaname, t.relname as \"Table Name\", io_i.indexrelname as \"Index Name\", CASE WHEN (io_i.idx_blks_hit <> 0 OR io_i.idx_blks_read <> 0) THEN round(io_i.idx_blks_hit / (io_i.idx_blks_hit::numeric + io_i.idx_blks_read::numeric), 4) ELSE null END as \"Index Hit Ratio\" FROM pg_stat_user_tables t JOIN pg_statio_user_indexes io_i ON io_i.relid = t.relid ORDER BY \"Index Hit Ratio\" DESC;" + }, + { + "description": "Buffer Cache Usage (Top 10 Relations)", + "query": "SELECT c.relname, count(*) AS buffers FROM pg_buffercache b INNER JOIN pg_class c ON b.relfilenode = pg_relation_filenode(c.oid) AND b.reldatabase IN (0, (SELECT oid FROM pg_database WHERE datname = current_database())) GROUP BY c.relname ORDER BY 2 DESC LIMIT 10;" + }, + { + "description": "Top 10 Relations Residing in Memory", + "query": "SELECT c.relname, pg_size_pretty(count(*)*8192) AS buffer_size, pg_size_pretty(pg_relation_size(c.oid)) as relation_size, Round(100.0 * Count(*) / (SELECT setting FROM pg_settings WHERE name = 'shared_buffers') :: INTEGER, 2) AS buffers_percent, ROUND(count(*)*8192*100/ pg_relation_size(c.oid)::numeric, 2 ) AS relation_percent, CASE WHEN c.relkind = 'r' THEN 'table' WHEN c.relkind = 'i' THEN 'index' WHEN c.relkind = 'S' THEN 'sequence' WHEN c.relkind = 't' THEN 'TOAST table' WHEN c.relkind = 'v' THEN 'view' WHEN c.relkind = 'm' THEN 'materialized view' WHEN c.relkind = 'c' THEN 'composite type' WHEN c.relkind = 'f' THEN 'foreign table' WHEN c.relkind = 'p' THEN 'partitioned table' WHEN c.relkind = 'I' THEN 'partitioned index' ELSE 'Unexpected relkind' END as relation_type FROM pg_class c INNER JOIN pg_buffercache b ON b.relfilenode = c.relfilenode INNER JOIN pg_database d ON ( b.reldatabase = d.oid AND d.datname = Current_database() ) GROUP BY c.relname, c.oid ORDER BY pg_total_relation_size(c.oid) DESC LIMIT 10;" + }, + { + "description": "How many time the index is used?", + "query": "SELECT relname, 100 * idx_scan / (seq_scan + idx_scan) percent_of_times_index_used, n_live_tup rows_in_table FROM pg_stat_user_tables WHERE seq_scan + idx_scan > 0 ORDER BY n_live_tup DESC;" + }, + { + "description": "Reset Statistics", + "query": "SELECT pg_stat_reset();" + } +] diff --git a/run-custom-dataset.py b/run-custom-dataset.py new file mode 100644 index 000000000..a5d1213d5 --- /dev/null +++ b/run-custom-dataset.py @@ -0,0 +1,262 @@ +import json +import time +from contextlib import redirect_stdout +import random +import subprocess +import psycopg +from psycopg import sql +import os + +os.environ["LOG_LEVEL"] = "DEBUG" + +def load_config(json_file): + with open(json_file, 'r') as file: + config = json.load(file) + return config + +def setup_database(config): + try: + conn = psycopg.connect( + dbname='postgres', + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + conn.autocommit = True + cursor = conn.cursor() + # Create the database if it doesn't exist + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db_name']]) + if not cursor.fetchone(): + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db_name']))) + conn.close() + + # Connect to the new database to create the extension + conn = psycopg.connect( + dbname=config['database']['db_name'], + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + cursor = conn.cursor() + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") + conn.commit() + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + +def teardown_database(config): + # Optionally drop the database after the test + pass + +def get_stats(config): + with open('queries.json', 'r') as file: + queries = json.load(file) + try: + conn = psycopg.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cur = conn.cursor() + for item in queries: + query = item['query'] + description = item['description'] + print(f"\nRunning query: {description}") + cur.execute(query) + rows = cur.fetchall() + headers = [desc[0] for desc in cur.description] + print(f"{' | '.join(headers)}") + for row in rows: + print(f"{' | '.join(map(str, row))}") + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + + +def query_configurations(config): + # List of configuration parameters to query + config_queries = [ + "SHOW checkpoint_timeout;", + "SHOW effective_cache_size;", + "SHOW jit;", + "SHOW maintenance_work_mem;", + "SHOW max_parallel_maintenance_workers;", + "SHOW max_parallel_workers;", + "SHOW max_parallel_workers_per_gather;", + "SHOW max_wal_size;", + "SHOW max_worker_processes;", + "SHOW shared_buffers;", + "SHOW wal_compression;", + "SHOW work_mem;" + ] + + try: + conn = psycopg.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cursor = conn.cursor() + results = [] + + # Execute each query and collect the result + for query in config_queries: + cursor.execute(query) + result = cursor.fetchone() + results.append(result[0] if result else None) + + # Print the raw output to debug + print("Raw query results:", results) + + config_dict = { + "checkpoint_timeout": results[0], + "effective_cache_size": results[1], + "jit": results[2], + "maintenance_work_mem": results[3], + "max_parallel_maintenance_workers": results[4], + "max_parallel_workers": results[5], + "max_parallel_workers_per_gather": results[6], + "max_wal_size": results[7], + "max_worker_processes": results[8], + "shared_buffers": results[9], + "wal_compression": results[10], + "work_mem": results[11] + } + + conn.close() + return config_dict + except Exception as e: + print(f"Failed to query configurations: {e}") + return {} + + +def run_benchmark(case, db_config): + base_command = [ + "vectordbbench", "pgvectorhnsw", + "--user-name", db_config['username'], + "--password", db_config['password'], + "--host", db_config['host'], + "--db-name", db_config['db_name'] + ] + + # Handle initial flags (no skip for the first ef_search) + if case.get("drop_old", True): + base_command.append("--drop-old") + else: + base_command.append("--skip-drop-old") + + if case.get("load", True): + base_command.append("--load") + else: + base_command.append("--skip-load") + + if case.get("search-serial", True): + base_command.append("--search-serial") + else: + base_command.append("--skip-search-serial") + + if case.get("search-concurrent", True): + base_command.append("--search-concurrent") + else: + base_command.append("--skip-search-concurrent") + + if case.get("custom-dataset-use-shuffled", True): + base_command.append("--custom-dataset-use-shuffled") + else: + base_command.append("--skip-custom-dataset-use-shuffled") + + base_command.extend([ + "--case-type", case["case-type"], + "--custom-case-name", str(case["custom-case-name"]), + "--custom-dataset-name", str(case["custom-dataset-name"]), + "--custom-dataset-dir", str(case["custom-dataset-dir"]), + "--custom-dataset-size", str(case["custom-dataset-size"]), + "--custom-dataset-dim", str(case["custom-dataset-dim"]), + "--custom-dataset-file-count", str(case["custom-dataset-file-count"]), + "--maintenance-work-mem", case["maintenance-work-mem"], + "--max-parallel-workers", str(case["max-parallel-workers"]), + "--ef-construction", str(case["ef-construction"]), + "--m", str(case["m"]), + "--k", str(case["k"]), + "--num-concurrency", case["num-concurrency"], + ]) + + run_count = case.get("run_count", 1) # Default to 1 if not specified + + + for run in range(run_count): + print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") + for i, ef_search in enumerate(case["ef-search"]): + command = base_command + ["--ef-search", str(ef_search)] + + # Build the index only once. + if i > 0 or run > 0: + # Remove conflicting --drop-old and --load flags + command = [arg for arg in command if arg not in ["--drop-old", "--load"]] + # Add skip flags if they are not already in the command + if "--skip-drop-old" not in command: + command.append("--skip-drop-old") + if "--skip-load" not in command: + command.append("--skip-load") + + try: + random_number = random.randint(1, 100000) + print(f"Running command: {' '.join(command)}") + output_dir = f"results/pgvector/hnsw/{case['db-label']}/{db_config['provider']}/{db_config['instance_type']}-{str(case['m'])}-{str(case['ef-construction'])}-{ef_search}-{case['case-type']}-{run}-{random_number}" + os.environ["RESULTS_LOCAL_DIR"] = output_dir + + os.makedirs(output_dir, exist_ok=True) + + with open(f"{output_dir}/log.txt", 'w') as f: + with redirect_stdout(f): + print(f"DB Instance Type: {db_config['instance_type']}") + print(f"DB Instance Provider: {db_config['provider']}") + print(f"DB enable_seqscan: {db_config['enable_seqscan']}") + for key, value in case.items(): + if key == "ef_search": + print(f"{key}: {ef_search}") + print(f"{key}: {value}") + print("Current PostgreSQL configurations:") + current_configs = query_configurations(db_config) + for key, value in current_configs.items(): + print(f"{key}: {value}") + get_stats(db_config) + f.flush() + print(f"Running command: {' '.join(command)}") + f.flush() + + print("***********START***********") + start_time = time.time() + # Capture both stdout and stderr and write them to the log file + subprocess.run(command, check=True, stdout=f, stderr=f) + end_time = time.time() + execution_time = end_time - start_time + print(f"total_duration={execution_time}") + print("***********END***********") + with redirect_stdout(f): + get_stats(db_config) + f.flush() + f.flush() + except subprocess.CalledProcessError as e: + print(f"Benchmark failed: {e}") + print("Sleeping for 1 min") + time.sleep(60) + +def main(): + config = load_config("config.json") + start_time = time.time() + for case in config['cases']: + print(f"Running case: {case['db-label']}") + setup_database(config) + + run_benchmark(case, config['database']) + end_time = time.time() + execution_time = end_time - start_time + print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") + +if __name__ == "__main__": + main() + diff --git a/run-dann.py b/run-dann.py new file mode 100644 index 000000000..eb816682c --- /dev/null +++ b/run-dann.py @@ -0,0 +1,220 @@ +import json +import time +from contextlib import redirect_stdout +import random +import subprocess +import psycopg2 +from psycopg2 import sql +import os + +os.environ["LOG_LEVEL"] = "DEBUG" + +def load_config(json_file): + with open(json_file, 'r') as file: + config = json.load(file) + return config + +def setup_database(config): + try: + conn = psycopg2.connect( + dbname='postgres', + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + conn.autocommit = True + cursor = conn.cursor() + # Create the database if it doesn't exist + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db_name']]) + if not cursor.fetchone(): + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db_name']))) + conn.close() + + # Connect to the new database to create the extension + conn = psycopg2.connect( + dbname=config['database']['db_name'], + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + cursor = conn.cursor() + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_diskann;") + conn.commit() + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + +def teardown_database(config): + # Optionally drop the database after the test + pass + +def query_configurations(config): + # List of configuration parameters to query + config_queries = [ + "SHOW checkpoint_timeout;", + "SHOW effective_cache_size;", + "SHOW jit;", + "SHOW maintenance_work_mem;", + "SHOW max_parallel_maintenance_workers;", + "SHOW max_parallel_workers;", + "SHOW max_parallel_workers_per_gather;", + "SHOW max_wal_size;", + "SHOW max_worker_processes;", + "SHOW shared_buffers;", + "SHOW wal_compression;", + "SHOW work_mem;" + ] + + try: + conn = psycopg2.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cursor = conn.cursor() + results = [] + + # Execute each query and collect the result + for query in config_queries: + cursor.execute(query) + result = cursor.fetchone() + results.append(result[0] if result else None) + + # Print the raw output to debug + print("Raw query results:", results) + + config_dict = { + "checkpoint_timeout": results[0], + "effective_cache_size": results[1], + "jit": results[2], + "maintenance_work_mem": results[3], + "max_parallel_maintenance_workers": results[4], + "max_parallel_workers": results[5], + "max_parallel_workers_per_gather": results[6], + "max_wal_size": results[7], + "max_worker_processes": results[8], + "shared_buffers": results[9], + "wal_compression": results[10], + "work_mem": results[11] + } + + conn.close() + return config_dict + except Exception as e: + print(f"Failed to query configurations: {e}") + return {} + + +def run_benchmark(case, db_config): + base_command = [ + "vectordbbench", "pgdiskann", + "--user-name", db_config['username'], + "--password", db_config['password'], + "--host", db_config['host'], + "--db-name", db_config['db_name'] + ] + + # Handle initial flags (no skip for the first ef_search) + if case.get("drop_old", True): + base_command.append("--drop-old") + else: + base_command.append("--skip-drop-old") + + if case.get("load", True): + base_command.append("--load") + else: + base_command.append("--skip-load") + + if case.get("search-serial", True): + base_command.append("--search-serial") + else: + base_command.append("--skip-search-serial") + + if case.get("search-concurrent", True): + base_command.append("--search-concurrent") + else: + base_command.append("--skip-search-concurrent") + + base_command.extend([ + "--case-type", case["case-type"], + "--maintenance-work-mem", case["maintenance-work-mem"], + "--max-parallel-workers", str(case["max-parallel-workers"]), + "--l-value-ib", str(case["l-value-ib"]), + "--max-neighbors", str(case["max-neighbors"]), + "--k", str(case["k"]), + "--num-concurrency", case["num-concurrency"], + "--concurrency-duration", str(case["concurrency-duration"]) + ]) + + run_count = case.get("run_count", 1) # Default to 1 if not specified + + for run in range(run_count): + print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") + for i, l_value_is in enumerate(case["l-value-is"]): + command = base_command + ["--l-value-is", str(l_value_is)] + + if i > 0 or run > 0: + # Remove conflicting --drop-old and --load flags + command = [arg for arg in command if arg not in ["--drop-old", "--load"]] + # Add skip flags if they are not already in the command + if "--skip-drop-old" not in command: + command.append("--skip-drop-old") + if "--skip-load" not in command: + command.append("--skip-load") + + try: + random_number = random.randint(1, 100000) + print(f"Running command: {' '.join(command)}") + output_dir = f"results/pgdiskann/diskann/{case['db-label']}/{db_config['provider']}/{db_config['instance_type']}-{str(case['max-neighbors'])}-{str(case['l-value-ib'])}-{l_value_is}-{case['case-type']}-{run}-{random_number}" + os.environ["RESULTS_LOCAL_DIR"] = output_dir + + os.makedirs(output_dir, exist_ok=True) + + with open(f"{output_dir}/log.txt", 'w') as f: + with redirect_stdout(f): + print(f"DB Instance Type: {db_config['instance_type']}") + print(f"DB Instance Provider: {db_config['provider']}") + print(f"DB enable_seqscan: {db_config['enable_seqscan']}") + for key, value in case.items(): + if key == "ef_search": + print(f"{key}: {ef_search}") + print(f"{key}: {value}") + print("Current PostgreSQL configurations:") + current_configs = query_configurations(db_config) + for key, value in current_configs.items(): + print(f"{key}: {value}") + print(f"Running command: {' '.join(command)}") + f.flush() + + print("***********START***********") + start_time = time.time() + # Capture both stdout and stderr and write them to the log file + subprocess.run(command, check=True, stdout=f, stderr=f) + end_time = time.time() + execution_time = end_time - start_time + print(f"total_duration={execution_time}") + print("***********END***********") + f.flush() + except subprocess.CalledProcessError as e: + print(f"Benchmark failed: {e}") + print("Sleeping for 30 sec") + time.sleep(60) + +def main(): + config = load_config("config.json") + start_time = time.time() + for case in config['cases']: + print(f"Running case: {case['db-label']}") + setup_database(config) + + run_benchmark(case, config['database']) + teardown_database(config) + end_time = time.time() + execution_time = end_time - start_time + print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") + +if __name__ == "__main__": + main() + diff --git a/run-pvs.py b/run-pvs.py new file mode 100644 index 000000000..2d415d66a --- /dev/null +++ b/run-pvs.py @@ -0,0 +1,297 @@ +import json +import time +from contextlib import redirect_stdout +import random +import subprocess +import psycopg2 +from psycopg2 import sql +import os + +os.environ["LOG_LEVEL"] = "DEBUG" + +def load_config(json_file): + with open(json_file, 'r') as file: + config = json.load(file) + return config + +def get_stats(config): + with open('queries.json', 'r') as file: + queries = json.load(file) + try: + conn = psycopg2.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cur = conn.cursor() + for item in queries: + query = item['query'] + description = item['description'] + print(f"\nRunning query: {description}") + try: + cur.execute(query) + rows = cur.fetchall() + headers = [desc[0] for desc in cur.description] + print(f"{' | '.join(headers)}") + for row in rows: + print(f"{' | '.join(map(str, row))}") + except Exception as e: + print(f"Failed to run query: {e}") + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + finally: + conn.close() + +def pre_warm(config): + print(f"Running pre warm for database:{config['db_name']}") + try: + conn = psycopg2.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'], + ) + cursor = conn.cursor() + cursor.execute("SELECT pg_prewarm('public.pgvector_index') as block_loaded") + conn.commit() + + result = cursor.fetchone() + print(f"Pre-warm blocks loaded: {result[0]}") + conn.close() + except Exception as e: + print(f"Failed to pre-warm the database: {e}") + +def setup_database(config): + try: + conn = psycopg2.connect( + dbname='postgres', + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + conn.autocommit = True + cursor = conn.cursor() + # Create the database if it doesn't exist + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db_name']]) + if not cursor.fetchone(): + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db_name']))) + conn.close() + + # Connect to the new database to create the extension + conn = psycopg2.connect( + dbname=config['database']['db_name'], + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + cursor = conn.cursor() + cursor.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_buffercache;") + cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_prewarm;") + conn.commit() + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + +def teardown_database(config): + # Optionally drop the database after the test + pass + +def query_configurations(config): + # List of configuration parameters to query + config_queries = [ + "SHOW checkpoint_timeout;", + "SHOW effective_cache_size;", + "SHOW jit;", + "SHOW maintenance_work_mem;", + "SHOW max_parallel_maintenance_workers;", + "SHOW max_parallel_workers;", + "SHOW max_parallel_workers_per_gather;", + "SHOW max_wal_size;", + "SHOW max_worker_processes;", + "SHOW shared_buffers;", + "SHOW wal_compression;", + "SHOW work_mem;" + ] + + try: + conn = psycopg2.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cursor = conn.cursor() + results = [] + + # Execute each query and collect the result + for query in config_queries: + cursor.execute(query) + result = cursor.fetchone() + results.append(result[0] if result else None) + + # Print the raw output to debug + print("Raw query results:", results) + + config_dict = { + "checkpoint_timeout": results[0], + "effective_cache_size": results[1], + "jit": results[2], + "maintenance_work_mem": results[3], + "max_parallel_maintenance_workers": results[4], + "max_parallel_workers": results[5], + "max_parallel_workers_per_gather": results[6], + "max_wal_size": results[7], + "max_worker_processes": results[8], + "shared_buffers": results[9], + "wal_compression": results[10], + "work_mem": results[11] + } + + conn.close() + return config_dict + except Exception as e: + print(f"Failed to query configurations: {e}") + return {} + + +def run_benchmark(case, db_config): + base_command = [ + "vectordbbench", "pgvectorscale", + "--user-name", db_config['username'], + "--password", db_config['password'], + "--host", db_config['host'], + "--db-name", db_config['db_name'] + ] + + # Handle initial flags (no skip for the first ef_search) + if case.get("drop_old", True): + base_command.append("--drop-old") + else: + base_command.append("--skip-drop-old") + + if case.get("load", True): + base_command.append("--load") + else: + base_command.append("--skip-load") + + if case.get("search_serial", True): + base_command.append("--search-serial") + else: + base_command.append("--skip-search-serial") + + if case.get("search_concurrent", True): + base_command.append("--search-concurrent") + else: + base_command.append("--skip-search-concurrent") + + base_command.extend([ + "--case-type", case["case-type"], + "--maintenance-work-mem", case["maintenance-work-mem"], + "--max-parallel-workers", str(case["max-parallel-workers"]), + "--k", str(case["k"]), + "--num-concurrency", case["num-concurrency"], + "--concurrency-duration", str(case["concurrency-duration"]) + ]) + + # Add pgvectorscale-specific parameters (excluding query-time parameters here for iteration) + base_command.extend([ + "--storage-layout", case["storage-layout"], + "--num-neighbors", str(case["num-neighbors"]), + "--search-list-size", str(case["search-list-size"]), + "--max-alpha", str(case["max-alpha"]), + "--num-dimensions", str(case["num-dimensions"]) + ]) + + run_count = case.get("run_count", 1) # Default to 1 if not specified + + for run in range(run_count): + print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") + for i, query_search_list_size in enumerate(case["query-search-list-size"]): + for query_rescore in case["query-rescore"]: + command = base_command + [ + "--query-search-list-size", str(query_search_list_size), + "--query-rescore", str(query_rescore) + ] + + if i > 0 or run > 0: + # Remove conflicting --drop-old and --load flags + command = [arg for arg in command if arg not in ["--drop-old", "--load"]] + # Add skip flags if they are not already in the command + if "--skip-drop-old" not in command: + command.append("--skip-drop-old") + if "--skip-load" not in command: + command.append("--skip-load") + + try: + random_number = random.randint(1, 100000) + print(f"Running command: {' '.join(command)}") + output_dir = ( + f"results/pgvectorscale/{case['db-label']}/{db_config['provider']}/" + f"{db_config['instance-type']}-" + f"{case['storage-layout']}-" + f"{case['num-neighbors']}-" + f"{case['search-list-size']}-" + f"{case['max-alpha']}-" + f"{query_search_list_size}-" + f"{query_rescore}-" + f"{case['case-type']}-{run}-{random_number}" + ) + os.environ["RESULTS_LOCAL_DIR"] = output_dir + + os.makedirs(output_dir, exist_ok=True) + + with open(f"{output_dir}/log.txt", 'w') as f: + with redirect_stdout(f): + print(f"DB Instance Type: {db_config['instance_type']}") + print(f"DB Instance Provider: {db_config['provider']}") + print(f"DB enable_seqscan: {db_config['enable_seqscan']}") + for key, value in case.items(): + if key == "ef_search": + print(f"{key}: {ef_search}") + print(f"{key}: {value}") + print("Current PostgreSQL configurations:") + current_configs = query_configurations(db_config) + for key, value in current_configs.items(): + print(f"{key}: {value}") + get_stats(db_config) + f.flush() + pre_warm(db_config) + print(f"Running command: {' '.join(command)}") + f.flush() + + print("***********START***********") + start_time = time.time() + # Capture both stdout and stderr and write them to the log file + subprocess.run(command, check=True, stdout=f, stderr=f) + end_time = time.time() + execution_time = end_time - start_time + print(f"total_duration={execution_time}") + print("***********END***********") + with redirect_stdout(f): + get_stats(db_config) + f.flush() + f.flush() + except subprocess.CalledProcessError as e: + print(f"Benchmark failed: {e}") + print("Sleeping for 30 sec") + time.sleep(60) + +def main(): + config = load_config("config.json") + start_time = time.time() + for case in config['cases']: + print(f"Running case: {case['db-label']}") + setup_database(config) + + run_benchmark(case, config['database']) + teardown_database(config) + end_time = time.time() + execution_time = end_time - start_time + print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") + +if __name__ == "__main__": + main() + diff --git a/run.py b/run.py new file mode 100644 index 000000000..535d2c589 --- /dev/null +++ b/run.py @@ -0,0 +1,221 @@ +import json +import time +from contextlib import redirect_stdout +import random +import subprocess +import psycopg2 +from psycopg2 import sql +import os + +os.environ["LOG_LEVEL"] = "DEBUG" + +def load_config(json_file): + with open(json_file, 'r') as file: + config = json.load(file) + return config + +def setup_database(config): + try: + conn = psycopg2.connect( + dbname='postgres', + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + conn.autocommit = True + cursor = conn.cursor() + # Create the database if it doesn't exist + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), [config['database']['db_name']]) + if not cursor.fetchone(): + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(config['database']['db_name']))) + conn.close() + + # Connect to the new database to create the extension + conn = psycopg2.connect( + dbname=config['database']['db_name'], + user=config['database']['username'], + password=config['database']['password'], + host=config['database']['host'] + ) + cursor = conn.cursor() + cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;") + conn.commit() + conn.close() + except Exception as e: + print(f"Setup failed: {e}") + +def teardown_database(config): + # Optionally drop the database after the test + pass + +def query_configurations(config): + # List of configuration parameters to query + config_queries = [ + "SHOW checkpoint_timeout;", + "SHOW effective_cache_size;", + "SHOW jit;", + "SHOW maintenance_work_mem;", + "SHOW max_parallel_maintenance_workers;", + "SHOW max_parallel_workers;", + "SHOW max_parallel_workers_per_gather;", + "SHOW max_wal_size;", + "SHOW max_worker_processes;", + "SHOW shared_buffers;", + "SHOW wal_compression;", + "SHOW work_mem;" + ] + + try: + conn = psycopg2.connect( + dbname=config['db_name'], + user=config['username'], + password=config['password'], + host=config['host'] + ) + cursor = conn.cursor() + results = [] + + # Execute each query and collect the result + for query in config_queries: + cursor.execute(query) + result = cursor.fetchone() + results.append(result[0] if result else None) + + # Print the raw output to debug + print("Raw query results:", results) + + config_dict = { + "checkpoint_timeout": results[0], + "effective_cache_size": results[1], + "jit": results[2], + "maintenance_work_mem": results[3], + "max_parallel_maintenance_workers": results[4], + "max_parallel_workers": results[5], + "max_parallel_workers_per_gather": results[6], + "max_wal_size": results[7], + "max_worker_processes": results[8], + "shared_buffers": results[9], + "wal_compression": results[10], + "work_mem": results[11] + } + + conn.close() + return config_dict + except Exception as e: + print(f"Failed to query configurations: {e}") + return {} + + +def run_benchmark(case, db_config): + base_command = [ + "vectordbbench", "pgvectorhnsw", + "--user-name", db_config['username'], + "--password", db_config['password'], + "--host", db_config['host'], + "--db-name", db_config['db_name'] + ] + + # Handle initial flags (no skip for the first ef_search) + if case.get("drop_old", True): + base_command.append("--drop-old") + else: + base_command.append("--skip-drop-old") + + if case.get("load", True): + base_command.append("--load") + else: + base_command.append("--skip-load") + + if case.get("search-serial", True): + base_command.append("--search-serial") + else: + base_command.append("--skip-search-serial") + + if case.get("search-concurrent", True): + base_command.append("--search-concurrent") + else: + base_command.append("--skip-search-concurrent") + + base_command.extend([ + "--case-type", case["case-type"], + "--maintenance-work-mem", case["maintenance-work-mem"], + "--max-parallel-workers", str(case["max-parallel-workers"]), + "--ef-construction", str(case["ef-construction"]), + "--m", str(case["m"]), + "--k", str(case["k"]), + "--num-concurrency", case["num-concurrency"], + "--concurrency-duration", str(case["concurrency-duration"]) + ]) + + run_count = case.get("run_count", 1) # Default to 1 if not specified + + + for run in range(run_count): + print(f"Starting run {run + 1} of {run_count} for case: {case['db-label']}") + for i, ef_search in enumerate(case["ef-search"]): + command = base_command + ["--ef-search", str(ef_search)] + + # Build the index only once. + if i > 0 or run > 0: + # Remove conflicting --drop-old and --load flags + command = [arg for arg in command if arg not in ["--drop-old", "--load"]] + # Add skip flags if they are not already in the command + if "--skip-drop-old" not in command: + command.append("--skip-drop-old") + if "--skip-load" not in command: + command.append("--skip-load") + + try: + random_number = random.randint(1, 100000) + print(f"Running command: {' '.join(command)}") + output_dir = f"results/pgvector/hnsw/{case['db-label']}/{db_config['provider']}/{db_config['instance_type']}-{str(case['m'])}-{str(case['ef-construction'])}-{ef_search}-{case['case-type']}-{run}-{random_number}" + os.environ["RESULTS_LOCAL_DIR"] = output_dir + + os.makedirs(output_dir, exist_ok=True) + + with open(f"{output_dir}/log.txt", 'w') as f: + with redirect_stdout(f): + print(f"DB Instance Type: {db_config['instance_type']}") + print(f"DB Instance Provider: {db_config['provider']}") + print(f"DB enable_seqscan: {db_config['enable_seqscan']}") + for key, value in case.items(): + if key == "ef_search": + print(f"{key}: {ef_search}") + print(f"{key}: {value}") + print("Current PostgreSQL configurations:") + current_configs = query_configurations(db_config) + for key, value in current_configs.items(): + print(f"{key}: {value}") + print(f"Running command: {' '.join(command)}") + f.flush() + + print("***********START***********") + start_time = time.time() + # Capture both stdout and stderr and write them to the log file + subprocess.run(command, check=True, stdout=f, stderr=f) + end_time = time.time() + execution_time = end_time - start_time + print(f"total_duration={execution_time}") + print("***********END***********") + f.flush() + except subprocess.CalledProcessError as e: + print(f"Benchmark failed: {e}") + print("Sleeping for 1 min") + time.sleep(60) + +def main(): + config = load_config("config.json") + start_time = time.time() + for case in config['cases']: + print(f"Running case: {case['db-label']}") + setup_database(config) + + run_benchmark(case, config['database']) + teardown_database(config) + end_time = time.time() + execution_time = end_time - start_time + print(f"COMPLETED ALL EXECUTIONS. total_duration={execution_time}") + +if __name__ == "__main__": + main() + diff --git a/sample-configs/config-custom-dataset-small-hnsw.json b/sample-configs/config-custom-dataset-small-hnsw.json new file mode 100644 index 000000000..8eb2b865b --- /dev/null +++ b/sample-configs/config-custom-dataset-small-hnsw.json @@ -0,0 +1,134 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "db.m6i.large", + "provider": "aws", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "memory-comparison-run-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "PerformanceCustomDataset", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [20], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "custom-case-name": "hnsw-1536D-500K", + "custom-dataset-name": "custom-openai", + "custom-dataset-dir": "openai_medium_500k", + "custom-dataset-size": 500000, + "custom-dataset-dim": 1536, + "custom-dataset-file-count": 1, + "custom-dataset-use-shuffled": false, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "PerformanceCustomDataset", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [20], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "custom-case-name": "hnsw-1536D-1m", + "custom-dataset-name": "custom-openai", + "custom-dataset-dir": "openai_medium_1m", + "custom-dataset-size": 500000, + "custom-dataset-dim": 1536, + "custom-dataset-file-count": 2, + "custom-dataset-use-shuffled": false, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "PerformanceCustomDataset", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [20], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "custom-case-name": "hnsw-1536D-1_5m", + "custom-dataset-name": "custom-openai", + "custom-dataset-dir": "openai_medium_1_5m", + "custom-dataset-size": 500000, + "custom-dataset-dim": 1536, + "custom-dataset-file-count": 3, + "custom-dataset-use-shuffled": false, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "PerformanceCustomDataset", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [20], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "custom-case-name": "hnsw-1536D-2m", + "custom-dataset-name": "custom-openai", + "custom-dataset-dir": "openai_medium_2m", + "custom-dataset-size": 500000, + "custom-dataset-dim": 1536, + "custom-dataset-file-count": 4, + "custom-dataset-use-shuffled": false, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "PerformanceCustomDataset", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [20], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "custom-case-name": "hnsw-1536D-2_5m", + "custom-dataset-name": "custom-openai", + "custom-dataset-dir": "openai_medium_2_5M", + "custom-dataset-size": 500000, + "custom-dataset-dim": 1536, + "custom-dataset-file-count": 5, + "custom-dataset-use-shuffled": false, + "run_count": 1 + } + ] + } + \ No newline at end of file diff --git a/sample-configs/config-large-diskann-filter.json b/sample-configs/config-large-diskann-filter.json new file mode 100644 index 000000000..6d39381ab --- /dev/null +++ b/sample-configs/config-large-diskann-filter.json @@ -0,0 +1,149 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "Standard_D8ds_v5", + "provider": "azure", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "filter-low", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l-value-ib": 128, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l-value-ib": 128, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-low", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l-value-ib": 64, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l-value-ib": 64, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-low", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l-value-ib": 128, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l-value-ib": 128, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-low", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K1P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l-value-ib": 64, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "filter-high", + "drop_old": false, + "load": false, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K99P", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l-value-ib": 64, + "l-value-is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-large-diskann.json b/sample-configs/config-large-diskann.json new file mode 100644 index 000000000..d19b63c9f --- /dev/null +++ b/sample-configs/config-large-diskann.json @@ -0,0 +1,81 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "Standard_D8ds_v5", + "provider": "azure-vm", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l_value_ib": 64, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 32, + "l_value_ib": 128, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l_value_ib": 64, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "max-neighbors": 64, + "l_value_ib": 128, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-large-hnsw.json b/sample-configs/config-large-hnsw.json new file mode 100644 index 000000000..cb70d6023 --- /dev/null +++ b/sample-configs/config-large-hnsw.json @@ -0,0 +1,149 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "Standard_D8ds_v5", + "provider": "azure-vm", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 3, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-large-pgvectorscale.json b/sample-configs/config-large-pgvectorscale.json new file mode 100644 index 000000000..12e7e4711 --- /dev/null +++ b/sample-configs/config-large-pgvectorscale.json @@ -0,0 +1,34 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db-name": "ann", + "instance-type": "db.m6i.large", + "provider": "aws", + "enable-seqscan": "on" + }, + "cases": [ + { + "db-label": "run1-seqon", + "drop-old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "8GB", + "max-parallel-workers": 7, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "storage-layout": "memory_optimized", + "search-list-size": 100, + "max-alpha": 1.2, + "num-dimensions": 0, + "query-search-list-size": [25, 50, 100, 150], + "query-rescore": [10, 50, 100], + "run-count": 1 + } + ] +} + diff --git a/sample-configs/config-small-diskann.json b/sample-configs/config-small-diskann.json new file mode 100644 index 000000000..68b83730a --- /dev/null +++ b/sample-configs/config-small-diskann.json @@ -0,0 +1,81 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "db.m6i.large", + "provider": "aws", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "max-neighbors": 32, + "l_value_ib": 64, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "max-neighbors": 32, + "l_value_ib": 128, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "max-neighbors": 64, + "l_value_ib": 64, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "max-neighbors": 64, + "l_value_ib": 128, + "l_value_is": [32, 64, 128, 256, 512], + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-small-hnsw.json b/sample-configs/config-small-hnsw.json new file mode 100644 index 000000000..7eb5feb51 --- /dev/null +++ b/sample-configs/config-small-hnsw.json @@ -0,0 +1,149 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "db.m6i.large", + "provider": "aws", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-small-hsnw.json b/sample-configs/config-small-hsnw.json new file mode 100644 index 000000000..7eb5feb51 --- /dev/null +++ b/sample-configs/config-small-hsnw.json @@ -0,0 +1,149 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "db.m6i.large", + "provider": "aws", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 8, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 16, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "run1-seqon", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D500K", + "maintenance-work-mem": "4GB", + "max-parallel-workers": 2, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,5,15,20,25,30,35,40,45,50", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/sample-configs/config-xxlarge-hnsw.json b/sample-configs/config-xxlarge-hnsw.json new file mode 100644 index 000000000..d19ff3059 --- /dev/null +++ b/sample-configs/config-xxlarge-hnsw.json @@ -0,0 +1,149 @@ +{ + "database": { + "host": "localhost", + "username": "postgres", + "password": "postgres", + "db_name": "ann", + "instance_type": "m7i.2xlarge", + "provider": "aws-vm", + "enable_seqscan": "on" + }, + "cases": [ + { + "db-label": "xl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 64, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 32, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 8, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 16, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + }, + { + "db-label": "xxl-vm-instance", + "drop_old": true, + "load": true, + "search-serial": true, + "search-concurrent": true, + "case-type": "Performance1536D5M", + "maintenance-work-mem": "32GB", + "max-parallel-workers": 15, + "ef-search": [10, 20, 40, 80, 120, 200, 400], + "ef-construction": 128, + "m": 32, + "num-concurrency": "1,10,20,30,40,50,60,70,80,90,100", + "concurrency-duration": 30, + "k": 10, + "run_count": 1 + } + ] +} diff --git a/script/db_configs.json b/script/db_configs.json new file mode 100644 index 000000000..9aed30032 --- /dev/null +++ b/script/db_configs.json @@ -0,0 +1,16 @@ +{ + "local": { + "host": "172.17.0.2", + "port": "5432", + "database": "postgres", + "user": "postgres", + "password": "admin123" + }, + "csp_db": { + "host": "aws-results.c8jxjby7azcm.us-west-2.rds.amazonaws.com", + "port": "5432", + "database": "db_results", + "user": "postgres", + "password": "Emumba123" + } +} \ No newline at end of file diff --git a/script/process.py b/script/process.py new file mode 100644 index 000000000..4adaa5ff5 --- /dev/null +++ b/script/process.py @@ -0,0 +1,61 @@ +import argparse +import os +import json +from typing import List +from utils import ( + insert_data, + process_result_file, + process_log_file, + get_default_dict, + generate_csv_file, +) + +parser = argparse.ArgumentParser(description="Process data") +parser.add_argument( + "--reset-table", action="store_true", help="Reset the records in the table" +) +parser.add_argument("--csv", action="store_true", help="Generate CSV file") +parser.add_argument("--csv-logs", action="store_true", help="Add logs in CSV file") +parser.add_argument( + "--csp-db", action="store_true", help="Connect to cloud service provider db" +) +args = parser.parse_args() + + +def process_directory(data_folder: str) -> List[dict]: + data_results = [] + for root, _, files in os.walk(data_folder): + for file in files: + if file.startswith("."): + continue + if file.endswith(".json"): + data_result = get_default_dict() + result_file_path = os.path.join(root, file) + log_file_path = os.path.join(root, "log.txt") + data_result = process_result_file(result_file_path, data_result) + if os.path.exists(log_file_path): + try: + process_log_file(log_file_path, data_result) + except Exception as e: + print(f"Error: {e}") + continue + data_results.append(data_result.copy()) + print(f"logs processed for run_id: {data_result['run_id']}") + return data_results + +def save_results(data_results: List[dict], output_file: str) -> None: + with open(output_file, 'w') as outfile: + json.dump(data_results, outfile, indent=4) + +if __name__ == "__main__": + data_folder = "script/data/" + output_file = "script/processed_results.json" + data_results = process_directory(data_folder) + save_results(data_results, output_file) + + #import pdb; + #pdb.set_trace() + insert_data(data_results, False, False) + if args.csv: + generate_csv_file(data_results, False) + print("Data processing complete") diff --git a/script/setup_db.py b/script/setup_db.py new file mode 100644 index 000000000..81412720e --- /dev/null +++ b/script/setup_db.py @@ -0,0 +1,93 @@ +import argparse +import json +import psycopg2 + + +parser = argparse.ArgumentParser(description="Drop existing table and create a new one") +parser.add_argument("--drop", action="store_true", help="Drop table") +parser.add_argument("--csp-db", action="store_true", help="Drop table") +args = parser.parse_args() + + +print("Connecting to the database...") +with open("script/db_configs.json") as f: + all_db_configs = json.load(f) +db_configs = all_db_configs["local"] +if args.csp_db: + db_configs = all_db_configs["csp_db"] + +conn = psycopg2.connect(**db_configs) +print("Connected to the database") + +try: + cur = conn.cursor() + if args.drop: + cur.execute("DROP TABLE benchmark_results") + conn.commit() + print("Table Dropped") + cur.execute( + """ + CREATE TABLE IF NOT EXISTS benchmark_results ( + id SERIAL PRIMARY KEY, + run_id VARCHAR(64) DEFAULT '', + csp VARCHAR(32) DEFAULT '', + load_duration FLOAT DEFAULT NULL, + recall FLOAT DEFAULT NULL, + qps FLOAT DEFAULT NULL, + qps_per_dollar FLOAT DEFAULT NULL, + index_build_time FLOAT DEFAULT NULL, + instance_type VARCHAR(64) DEFAULT '', + index_params JSON DEFAULT '{}', + search_params JSON DEFAULT '{}', + config_label TEXT DEFAULT '', + index_type VARCHAR(64) DEFAULT '', + db_case_config JSON DEFAULT '{}', + raw_data JSON DEFAULT '{}', + metric_type VARCHAR(64) DEFAULT '', + logs TEXT DEFAULT '', + enable_seqscan VARCHAR(32) DEFAULT '', + db_label VARCHAR(64) DEFAULT '', + drop_old BOOLEAN DEFAULT FALSE, + load BOOLEAN DEFAULT FALSE, + search_serial BOOLEAN DEFAULT FALSE, + search_concurrent BOOLEAN DEFAULT FALSE, + conc_latency_p99_list FLOAT[], + conc_num_list FLOAT[], + conc_qps_list FLOAT[], + case_type VARCHAR(64) DEFAULT '', + maintenance_work_mem VARCHAR(32) DEFAULT '', + max_parallel_workers INTEGER DEFAULT 0, + num_concurrency FLOAT[], + concurrency_duration INTEGER DEFAULT 0, + run_count INTEGER DEFAULT 0, + checkpoint_timeout VARCHAR(64) DEFAULT '', + effective_cache_size VARCHAR(64) DEFAULT '', + jit VARCHAR(255) DEFAULT '', + max_parallel_maintenance_workers INTEGER DEFAULT 0, + max_parallel_workers_per_gather INTEGER DEFAULT 0, + max_wal_size VARCHAR(32) DEFAULT '', + max_worker_processes INTEGER DEFAULT 0, + shared_buffers VARCHAR(32) DEFAULT '', + wal_compression VARCHAR(32) DEFAULT '', + work_mem VARCHAR(32) DEFAULT '', + create_index_before_load BOOLEAN DEFAULT FALSE, + create_index_after_load BOOLEAN DEFAULT FALSE, + index_size VARCHAR(32) DEFAULT '', + k INTEGER DEFAULT NULL, + max_load_count INTEGER DEFAULT NULL, + ndcg FLOAT DEFAULT NULL, + serial_latency_p99 FLOAT DEFAULT NULL, + table_size VARCHAR(32) DEFAULT '', + vector_extension VARCHAR(32) DEFAULT '' + ) + """ + ) + conn.commit() + print("Created the results table") +except Exception as e: + print(e) +finally: + cur.close() + conn.close() + +print("Database setup complete") diff --git a/script/utils/__init__.py b/script/utils/__init__.py new file mode 100644 index 000000000..ff0edeafb --- /dev/null +++ b/script/utils/__init__.py @@ -0,0 +1,22 @@ +from .db import insert_data +from .csv import generate_csv_file +from .preprocessing import ( + parse_log_file, + get_config_label, + get_index_params, + get_default_dict, + process_log_file, + process_result_file, +) + + +__all__ = [ + "insert_data", + "parse_log_file", + "get_config_label", + "get_index_params", + "process_result_file", + "process_log_file", + "get_default_dict", + "generate_csv_file", +] \ No newline at end of file diff --git a/script/utils/csv.py b/script/utils/csv.py new file mode 100644 index 000000000..a1a240112 --- /dev/null +++ b/script/utils/csv.py @@ -0,0 +1,15 @@ +import os +import csv + +def generate_csv_file(data, logs=False): + csv_file = "script/results.csv" + if os.path.exists(csv_file): + os.remove(csv_file) + for record in data: + if not logs: + record.pop("logs", None) # Remove 'logs' key from record dictionary + with open(csv_file, "a", newline="") as f: + writer = csv.DictWriter(f, fieldnames=record.keys()) + if f.tell() == 0: + writer.writeheader() + writer.writerow(record) \ No newline at end of file diff --git a/script/utils/db.py b/script/utils/db.py new file mode 100644 index 000000000..dcfac4578 --- /dev/null +++ b/script/utils/db.py @@ -0,0 +1,53 @@ +import json +from typing import Tuple, List +import psycopg2 +from psycopg2 import sql + + +def insert_data(data: List[dict], reset_records: bool=False, csp_db: bool=False): + print("Connecting to the database...") + db_cofigurations = get_db_config() + db_config = ( + db_cofigurations["csp_db"] + if csp_db + else db_cofigurations["local"] + ) + conn = psycopg2.connect(**db_config) + print("Connected to the database") + try: + cur = conn.cursor() + if reset_records: + cur.execute("DELETE FROM benchmark_results") + conn.commit() + print("benchmark_results Table Deleted") + + for count, record in enumerate(data): + print(f"Inserting record into the results table: {count+1}") + insert_fields, insert_values = build_query_params(record) + insert_query = sql.SQL( + f"""INSERT INTO benchmark_results ({",".join(insert_fields)}) VALUES ({', '.join(['%s' for _ in insert_values])})""" + ) + cur.execute(insert_query, insert_values) + conn.commit() + except Exception as e: + print(f"Error: {e}") + finally: + print("Inserting data into the results table finished") + cur.close() + conn.close() + +def get_db_config(): + with open("script/db_configs.json") as f: + all_db_configs = json.load(f) + return all_db_configs + +def build_query_params(record: dict) -> Tuple[str, str]: + insert_fields = [] + insert_values = [] + for key, val in record.items(): + insert_fields.append(key) + if isinstance(val, dict): + insert_values.append(json.dumps(val)) + else: + insert_values.append(val) + return insert_fields, insert_values diff --git a/script/utils/preprocessing.py b/script/utils/preprocessing.py new file mode 100644 index 000000000..fbc20a68c --- /dev/null +++ b/script/utils/preprocessing.py @@ -0,0 +1,203 @@ +import re +import json +from typing import List + + +def get_default_dict() -> dict: + data_result = { + "case_type": "", + "checkpoint_timeout": "", + "conc_latency_p99_list": [], + "conc_num_list": [], + "conc_qps_list": [], + "config_label": "", + "concurrency_duration": None, + "create_index_after_load": False, + "create_index_before_load": False, + "csp": "", + "enable_seqscan": False, + "db_case_config": {}, + "instance_type": "", + "db_label": "", + "drop_old": False, + "effective_cache_size": "", + "run_count": None, + "index_build_time": None, + "index_params": {}, + "index_size": "", + "index_type": "", + "jit": False, + "k": None, + "load": False, + "load_duration": None, + "logs": "", + "maintenance_work_mem": "", + "max_load_count": None, + "max_parallel_maintenance_workers": None, + "max_parallel_workers": None, + "max_parallel_workers_per_gather": None, + "max_wal_size": "", + "max_worker_processes": None, + "metric_type": "", + "ndcg": None, + "num_concurrency": None, + "qps": None, + "qps_per_dollar": None, + "raw_data": {}, + "recall": None, + "run_id": "", + "search_concurrent": False, + "search_params": {}, + "search_serial": False, + "serial_latency_p99": None, + "shared_buffers": "", + "table_size": "", + "vector_extension": "", + "wal_compression": False, + "work_mem": "", + "db_case_config": {}, + } + return data_result + +def process_result_file(result_file_path: str, data_result: dict) -> dict: + with open(result_file_path, 'r') as result_file: + result_data = json.load(result_file) + result = result_data["results"][0] # Assuming there is only one case result + data_result["run_id"] = result_data["run_id"] + print(f"Processing result logs of run_id: {data_result['run_id']}") + data_result["vector_extension"] = result["task_config"]["db"] + data_result["k"] = result["task_config"]["case_config"]["k"] + data_result["num_concurrency"] = ( + result["task_config"]["case_config"]["concurrency_search_config"]["num_concurrency"] + ) + data_result["concurrency_duration"] = ( + result["task_config"]["case_config"]["concurrency_search_config"]["concurrency_duration"] + ) + data_result.update(result["metrics"]) + data_result["create_index_after_load"] = result["task_config"]["db_case_config"]["create_index_after_load"] + data_result["create_index_before_load"] = result["task_config"]["db_case_config"]["create_index_before_load"] + data_result["index_params"] = get_index_params(result["task_config"]["db_case_config"]) + data_result["search_params"] = get_search_params(result["task_config"]["db_case_config"]) + data_result["index_type"] = result["task_config"]["db_case_config"]["index"] + data_result["metric_type"] = result["task_config"]["db_case_config"]["metric_type"] + data_result["raw_data"] = result + data_result["db_case_config"] = result["task_config"]["db_case_config"] + data_result["config_label"] = get_config_label( + result["task_config"]["db_case_config"]["index"], + data_result["search_params"], + data_result["index_params"], + ) + data_result["index_build_time"] = data_result["build_dur"] + del data_result["build_dur"] + return data_result + +def process_log_file(log_file_path: str, data_result: dict) -> None: + with open(log_file_path, 'r') as log_file: + logs = log_file.read() + parsed_logs = parse_log_file(logs) + data_result["instance_type"] = parsed_logs["db_instance_type"] + data_result["csp"] = parsed_logs["db_instance_provider"] + data_result["db_label"] = parsed_logs["db_label"] + data_result["run_count"] = parsed_logs["run_count"] + data_result["enable_seqscan"] = parsed_logs["db_enable_seqscan"] + data_result["effective_cache_size"] = parsed_logs["effective_cache_size"] + data_result["jit"] = parsed_logs["jit"] + data_result["checkpoint_timeout"] = parsed_logs["checkpoint_timeout"] + data_result["maintenance_work_mem"] = parsed_logs["maintenance_work_mem"] + data_result["drop_old"] = parsed_logs["drop_old"] + data_result["load"] = parsed_logs["load"] + data_result["search_concurrent"] = parsed_logs["search_concurrent"] + data_result["search_serial"] = parsed_logs["search_serial"] + data_result["max_parallel_maintenance_workers"] = parsed_logs["max_parallel_maintenance_workers"] + data_result["max_parallel_workers"] = parsed_logs["max_parallel_workers"] + data_result["max_parallel_workers_per_gather"] = parsed_logs["max_parallel_workers_per_gather"] + data_result["case_type"] = parsed_logs["case_type"] + data_result["max_wal_size"] = parsed_logs["max_wal_size"] + data_result["max_worker_processes"] = parsed_logs["max_worker_processes"] + data_result["shared_buffers"] = parsed_logs["shared_buffers"] + data_result["wal_compression"] = parsed_logs["wal_compression"] + data_result["work_mem"] = parsed_logs["work_mem"] + data_result["logs"] = logs + +def create_key(key: str) -> str: + return key.strip().lower().replace(' ', '_').replace('-', '_') + +def parse_log_file(log_content: List[str]) -> dict: + error_pattern = re.compile(r'WARNING.*failed to run, reason=', re.MULTILINE) + if error_pattern.search(log_content): + raise Exception("Error log found: The case failed to run.") + pattern = re.compile(r'^(.*?):\s*(.*)$', re.MULTILINE) + matches = pattern.findall(log_content) + log_dict = {create_key(key.strip()): value.strip() for key, value in matches} + del log_dict["current_postgresql_configurations"] + return log_dict + +def get_config_label( + index_type: str, search_params: dict, index_params: dict +) -> str: + config_label = f'{index_type}' + if index_type.lower() == "ivf_flat": + config_label = ( + config_label + + " - lists=" + + str(index_params["lists"]) + + "; probes=" + + str(search_params["probes"]) + ) + elif index_type.lower() == "hnsw": + config_label = ( + config_label + + " - m=" + + str(index_params["m"]) + + "; ef_c=" + + str(index_params["ef_construction"]) + + "; ef_s=" + str(search_params["ef_search"]) + ) + elif index_type.lower() == "streaming_diskann": + config_label = ( + config_label + + " - storage_layout=" + + str(index_params["storage_layout"]) + + "; num_neighbors=" + + str(index_params["num_neighbors"]) + + "; search_list_size=" + + str(search_params["search_list_size"]) + + "; max_alpha=" + + str(search_params["max_alpha"]) + + "; num_dimensions=" + + str(search_params["num_dimensions"]) + + "; num_bits_per_dimension=" + + str(search_params["num_bits_per_dimension"]) + + "; query_search_list_size=" + + str(search_params["query_search_list_size"]) + + "; query_rescore=" + + str(search_params["query_rescore"]) + ) + return config_label + +def get_index_params(db_case_config: dict) -> dict: + index_params = {} + if db_case_config["index"].lower() == "hnsw": + index_params["m"] = db_case_config["m"] + index_params["ef_construction"] = db_case_config["ef_construction"] + elif db_case_config["index"].lower() == "ivf_flat": + index_params["lists"] = db_case_config["lists"] + elif db_case_config["index"].lower() == "streaming_diskann": + index_params["storage_layout"] = db_case_config["storage_layout"] + index_params["num_neighbors"] = db_case_config["num_neighbors"] + index_params["search_list_size"] = db_case_config["search_list_size"] + index_params["max_alpha"] = db_case_config["max_alpha"] + index_params["num_dimensions"] = db_case_config["num_dimensions"] + index_params["num_bits_per_dimension"] = db_case_config["num_bits_per_dimension"] + return index_params + +def get_search_params(db_case_config: dict) -> dict: + search_params = {} + if db_case_config["index"].lower() == "hnsw": + search_params["ef_search"] = db_case_config["ef_search"] + elif db_case_config["index"].lower() == "ivf_flat": + search_params["probes"] = db_case_config["probes"] + elif db_case_config["index"].lower() == "streaming_diskann": + search_params["query_search_list_size"] = db_case_config["query_search_list_size"] + search_params["query_rescore"] = db_case_config["query_rescore"] + return search_params diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index f7664502f..3d8419a4f 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -4,6 +4,7 @@ import environs from . import log_util +import os env = environs.Env() env.read_env(".env", False) @@ -16,7 +17,7 @@ class config: LOG_LEVEL = env.str("LOG_LEVEL", "INFO") DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL) - DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset") + DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", f"/home/{os.getenv('USER')}/vectordb_bench/dataset") NUM_PER_BATCH = env.int("NUM_PER_BATCH", 5000) DROP_OLD = env.bool("DROP_OLD", True) @@ -46,14 +47,14 @@ class config: LOAD_TIMEOUT_1536D_500K = 2.5 * 3600 # 2.5h LOAD_TIMEOUT_1536D_5M = 25 * 3600 # 25h - OPTIMIZE_TIMEOUT_DEFAULT = 30 * 60 # 30min - OPTIMIZE_TIMEOUT_768D_1M = 30 * 60 # 30min + OPTIMIZE_TIMEOUT_DEFAULT = 300 * 60 # 60min + OPTIMIZE_TIMEOUT_768D_1M = 60 * 60 # 60min OPTIMIZE_TIMEOUT_768D_10M = 5 * 3600 # 5h OPTIMIZE_TIMEOUT_768D_100M = 50 * 3600 # 50h - OPTIMIZE_TIMEOUT_1536D_500K = 15 * 60 # 15min - OPTIMIZE_TIMEOUT_1536D_5M = 2.5 * 3600 # 2.5h + OPTIMIZE_TIMEOUT_1536D_500K = 300 * 60 # 300min + OPTIMIZE_TIMEOUT_1536D_5M = 5 * 3600 # 5h def display(self) -> str: tmp = [ i for i in inspect.getmembers(self) diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 3e87e1fbe..c26aa3d6d 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -31,6 +31,7 @@ class DB(Enum): PgVector = "PgVector" PgVectoRS = "PgVectoRS" PgVectorScale = "PgVectorScale" + PgDiskANN = "PgDiskANN" Redis = "Redis" MemoryDB = "MemoryDB" Chroma = "Chroma" @@ -77,6 +78,10 @@ def init_cls(self) -> Type[VectorDB]: from .pgvectorscale.pgvectorscale import PgVectorScale return PgVectorScale + if self == DB.PgDiskANN: + from .pgdiskann.pgdiskann import PgDiskANN + return PgDiskANN + if self == DB.Redis: from .redis.redis import Redis return Redis @@ -132,6 +137,10 @@ def config_cls(self) -> Type[DBConfig]: from .pgvectorscale.config import PgVectorScaleConfig return PgVectorScaleConfig + if self == DB.PgDiskANN: + from .pgdiskann.config import PgDiskANNConfig + return PgDiskANNConfig + if self == DB.Redis: from .redis.config import RedisConfig return RedisConfig @@ -185,6 +194,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon from .pgvectorscale.config import _pgvectorscale_case_config return _pgvectorscale_case_config.get(index_type) + if self == DB.PgDiskANN: + from .pgdiskann.config import _pgdiskann_case_config + return _pgdiskann_case_config.get(index_type) + # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/pgdiskann/cli.py b/vectordb_bench/backend/clients/pgdiskann/cli.py new file mode 100644 index 000000000..18a9ecbd5 --- /dev/null +++ b/vectordb_bench/backend/clients/pgdiskann/cli.py @@ -0,0 +1,99 @@ +import click +import os +from pydantic import SecretStr + +from ....cli.cli import ( + CommonTypedDict, + cli, + click_parameter_decorators_from_typed_dict, + run, +) +from typing import Annotated, Optional, Unpack +from vectordb_bench.backend.clients import DB + + +class PgDiskAnnTypedDict(CommonTypedDict): + user_name: Annotated[ + str, click.option("--user-name", type=str, help="Db username", required=True) + ] + password: Annotated[ + str, + click.option("--password", + type=str, + help="Postgres database password", + default=lambda: os.environ.get("POSTGRES_PASSWORD", ""), + show_default="$POSTGRES_PASSWORD", + ), + ] + + host: Annotated[ + str, click.option("--host", type=str, help="Db host", required=True) + ] + db_name: Annotated[ + str, click.option("--db-name", type=str, help="Db name", required=True) + ] + max_neighbors: Annotated[ + int, + click.option( + "--max-neighbors", type=int, help="PgDiskAnn max neighbors", + ), + ] + l_value_ib: Annotated[ + int, + click.option( + "--l-value-ib", type=int, help="PgDiskAnn l_value_ib", + ), + ] + l_value_is: Annotated[ + float, + click.option( + "--l-value-is", type=float, help="PgDiskAnn l_value_is", + ), + ] + maintenance_work_mem: Annotated[ + Optional[str], + click.option( + "--maintenance-work-mem", + type=str, + help="Sets the maximum memory to be used for maintenance operations (index creation). " + "Can be entered as string with unit like '64GB' or as an integer number of KB." + "This will set the parameters: max_parallel_maintenance_workers," + " max_parallel_workers & table(parallel_workers)", + required=False, + ), + ] + max_parallel_workers: Annotated[ + Optional[int], + click.option( + "--max-parallel-workers", + type=int, + help="Sets the maximum number of parallel processes per maintenance operation (index creation)", + required=False, + ), + ] + +@cli.command() +@click_parameter_decorators_from_typed_dict(PgDiskAnnTypedDict) +def PgDiskAnn( + **parameters: Unpack[PgDiskAnnTypedDict], +): + from .config import PgDiskANNConfig, PgDiskANNImplConfig + + run( + db=DB.PgDiskANN, + db_config=PgDiskANNConfig( + db_label=parameters["db_label"], + user_name=SecretStr(parameters["user_name"]), + password=SecretStr(parameters["password"]), + host=parameters["host"], + db_name=parameters["db_name"], + ), + db_case_config=PgDiskANNImplConfig( + max_neighbors=parameters["max_neighbors"], + l_value_ib=parameters["l_value_ib"], + l_value_is=parameters["l_value_is"], + max_parallel_workers=parameters["max_parallel_workers"], + maintenance_work_mem=parameters["maintenance_work_mem"], + ), + **parameters, + ) \ No newline at end of file diff --git a/vectordb_bench/backend/clients/pgdiskann/config.py b/vectordb_bench/backend/clients/pgdiskann/config.py new file mode 100644 index 000000000..970720afa --- /dev/null +++ b/vectordb_bench/backend/clients/pgdiskann/config.py @@ -0,0 +1,145 @@ +from abc import abstractmethod +from typing import Any, Mapping, Optional, Sequence, TypedDict +from pydantic import BaseModel, SecretStr +from typing_extensions import LiteralString +from ..api import DBCaseConfig, DBConfig, IndexType, MetricType + +POSTGRE_URL_PLACEHOLDER = "postgresql://%s:%s@%s/%s" + + +class PgDiskANNConfigDict(TypedDict): + """These keys will be directly used as kwargs in psycopg connection string, + so the names must match exactly psycopg API""" + + user: str + password: str + host: str + port: int + dbname: str + + +class PgDiskANNConfig(DBConfig): + user_name: SecretStr = SecretStr("postgres") + password: SecretStr + host: str = "localhost" + port: int = 5432 + db_name: str + + def to_dict(self) -> PgDiskANNConfigDict: + user_str = self.user_name.get_secret_value() + pwd_str = self.password.get_secret_value() + return { + "host": self.host, + "port": self.port, + "dbname": self.db_name, + "user": user_str, + "password": pwd_str, + } + + +class PgDiskANNIndexConfig(BaseModel, DBCaseConfig): + metric_type: MetricType | None = None + create_index_before_load: bool = False + create_index_after_load: bool = True + maintenance_work_mem: Optional[str] + max_parallel_workers: Optional[int] + + def parse_metric(self) -> str: + if self.metric_type == MetricType.L2: + return "vector_l2_ops" + elif self.metric_type == MetricType.IP: + return "vector_ip_ops" + return "vector_cosine_ops" + + def parse_metric_fun_op(self) -> LiteralString: + if self.metric_type == MetricType.L2: + return "<->" + elif self.metric_type == MetricType.IP: + return "<#>" + return "<=>" + + def parse_metric_fun_str(self) -> str: + if self.metric_type == MetricType.L2: + return "l2_distance" + elif self.metric_type == MetricType.IP: + return "max_inner_product" + return "cosine_distance" + + @abstractmethod + def index_param(self) -> dict: + ... + + @abstractmethod + def search_param(self) -> dict: + ... + + @abstractmethod + def session_param(self) -> dict: + ... + + @staticmethod + def _optionally_build_with_options(with_options: Mapping[str, Any]) -> Sequence[dict[str, Any]]: + """Walk through mappings, creating a List of {key1 = value} pairs. That will be used to build a where clause""" + options = [] + for option_name, value in with_options.items(): + if value is not None: + options.append( + { + "option_name": option_name, + "val": str(value), + } + ) + return options + + @staticmethod + def _optionally_build_set_options( + set_mapping: Mapping[str, Any] + ) -> Sequence[dict[str, Any]]: + """Walk through options, creating 'SET 'key1 = "value1";' list""" + session_options = [] + for setting_name, value in set_mapping.items(): + if value: + session_options.append( + {"parameter": { + "setting_name": setting_name, + "val": str(value), + }, + } + ) + return session_options + + +class PgDiskANNImplConfig(PgDiskANNIndexConfig): + index: IndexType = IndexType.DISKANN + max_neighbors: int | None + l_value_ib: int | None + l_value_is: float | None + maintenance_work_mem: Optional[str] = None + max_parallel_workers: Optional[int] = None + + def index_param(self) -> dict: + return { + "metric": self.parse_metric(), + "index_type": self.index.value, + "options": { + "max_neighbors": self.max_neighbors, + "l_value_ib": self.l_value_ib, + }, + "maintenance_work_mem": self.maintenance_work_mem, + "max_parallel_workers": self.max_parallel_workers, + } + + def search_param(self) -> dict: + return { + "metric": self.parse_metric(), + "metric_fun_op": self.parse_metric_fun_op(), + } + + def session_param(self) -> dict: + return { + "diskann.l_value_is": self.l_value_is, + } + +_pgdiskann_case_config = { + IndexType.DISKANN: PgDiskANNImplConfig, +} diff --git a/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py b/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py new file mode 100644 index 000000000..1c7c55b64 --- /dev/null +++ b/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py @@ -0,0 +1,385 @@ +"""Wrapper around the pg_diskann vector database over VectorDB""" + +import logging +import pprint +from contextlib import contextmanager +from typing import Any, Generator, Optional, Tuple + +import numpy as np +import psycopg +from pgvector.psycopg import register_vector +from psycopg import Connection, Cursor, sql + +from ..api import VectorDB +from .config import PgDiskANNConfigDict, PgDiskANNIndexConfig + +log = logging.getLogger(__name__) + + +class PgDiskANN(VectorDB): + """Use psycopg instructions""" + + conn: psycopg.Connection[Any] | None = None + coursor: psycopg.Cursor[Any] | None = None + + _filtered_search: sql.Composed + _unfiltered_search: sql.Composed + + def __init__( + self, + dim: int, + db_config: PgDiskANNConfigDict, + db_case_config: PgDiskANNIndexConfig, + collection_name: str = "pg_diskann_collection", + drop_old: bool = False, + **kwargs, + ): + self.name = "PgDiskANN" + self.db_config = db_config + self.case_config = db_case_config + self.table_name = collection_name + self.dim = dim + + self._index_name = "pgdiskann_index" + self._primary_field = "id" + self._vector_field = "embedding" + + self.conn, self.cursor = self._create_connection(**self.db_config) + + log.info(f"{self.name} config values: {self.db_config}\n{self.case_config}") + if not any( + ( + self.case_config.create_index_before_load, + self.case_config.create_index_after_load, + ) + ): + err = f"{self.name} config must create an index using create_index_before_load or create_index_after_load" + log.error(err) + raise RuntimeError( + f"{err}\n{pprint.pformat(self.db_config)}\n{pprint.pformat(self.case_config)}" + ) + + if drop_old: + self._drop_index() + self._drop_table() + self._create_table(dim) + if self.case_config.create_index_before_load: + self._create_index() + + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + @staticmethod + def _create_connection(**kwargs) -> Tuple[Connection, Cursor]: + conn = psycopg.connect(**kwargs) + conn.cursor().execute("CREATE EXTENSION IF NOT EXISTS pg_diskann CASCADE") + conn.commit() + register_vector(conn) + conn.autocommit = False + cursor = conn.cursor() + + assert conn is not None, "Connection is not initialized" + assert cursor is not None, "Cursor is not initialized" + + return conn, cursor + + @contextmanager + def init(self) -> Generator[None, None, None]: + self.conn, self.cursor = self._create_connection(**self.db_config) + + # index configuration may have commands defined that we should set during each client session + session_options: dict[str, Any] = self.case_config.session_param() + + if len(session_options) > 0: + for setting_name, setting_val in session_options.items(): + command = sql.SQL("SET {setting_name} " + "= {setting_val};").format( + setting_name=sql.Identifier(setting_name), + setting_val=sql.Identifier(str(setting_val)), + ) + log.debug(command.as_string(self.cursor)) + self.cursor.execute(command) + self.conn.commit() + + self._filtered_search = sql.Composed( + [ + sql.SQL( + "SELECT id FROM public.{table_name} WHERE id >= %s ORDER BY embedding " + ).format(table_name=sql.Identifier(self.table_name)), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + + self._unfiltered_search = sql.Composed( + [ + sql.SQL("SELECT id FROM public.{} ORDER BY embedding ").format( + sql.Identifier(self.table_name) + ), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + + try: + yield + finally: + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + def _drop_table(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop table : {self.table_name}") + + self.cursor.execute( + sql.SQL("DROP TABLE IF EXISTS public.{table_name}").format( + table_name=sql.Identifier(self.table_name) + ) + ) + self.conn.commit() + + def get_size_info(self): + try: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client get size info.") + + size_sql = sql.SQL("SELECT pg_size_pretty(pg_table_size('{table_name}')) as table_size, pg_size_pretty(pg_table_size('{index_name}')) as index_size;").format( + table_name=sql.Identifier(self.table_name), + index_name=sql.Identifier(self._index_name) + ) + log.debug(size_sql.as_string(self.cursor)) + self.cursor.execute(size_sql) + self.conn.commit() + result = self.cursor.fetchone() + + # Parse the results + if result: + table_size = result[0] # First column value + index_size = result[1] + log.info(f"Table Size: {table_size}, Index Size: {index_size}") + return (table_size, index_size) + else: + log.error("No results returned from the query.") + return (0, 0) + except Exception as e: + log.warning( + f"Failed to fetch table and index information" + ) + return (0, 0) + + def ready_to_load(self): + pass + + def optimize(self): + self._post_insert() + + def _post_insert(self): + log.info(f"{self.name} post insert before optimize") + if self.case_config.create_index_after_load: + self._drop_index() + self._create_index() + + def _drop_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop index : {self._index_name}") + + drop_index_sql = sql.SQL("DROP INDEX IF EXISTS {index_name}").format( + index_name=sql.Identifier(self._index_name) + ) + log.debug(drop_index_sql.as_string(self.cursor)) + self.cursor.execute(drop_index_sql) + self.conn.commit() + + def _set_parallel_index_build_param(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + index_param = self.case_config.index_param() + + if index_param["maintenance_work_mem"] is not None: + self.cursor.execute( + sql.SQL("SET maintenance_work_mem TO {};").format( + index_param["maintenance_work_mem"] + ) + ) + self.cursor.execute( + sql.SQL("ALTER USER {} SET maintenance_work_mem TO {};").format( + sql.Identifier(self.db_config["user"]), + index_param["maintenance_work_mem"], + ) + ) + self.conn.commit() + + if index_param["max_parallel_workers"] is not None: + self.cursor.execute( + sql.SQL("SET max_parallel_maintenance_workers TO '{}';").format( + index_param["max_parallel_workers"] + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER USER {} SET max_parallel_maintenance_workers TO '{}';" + ).format( + sql.Identifier(self.db_config["user"]), + index_param["max_parallel_workers"], + ) + ) + self.cursor.execute( + sql.SQL("SET max_parallel_workers TO '{}';").format( + index_param["max_parallel_workers"] + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER USER {} SET max_parallel_workers TO '{}';" + ).format( + sql.Identifier(self.db_config["user"]), + index_param["max_parallel_workers"], + ) + ) + self.cursor.execute( + sql.SQL( + "ALTER TABLE {} SET (parallel_workers = {});" + ).format( + sql.Identifier(self.table_name), + index_param["max_parallel_workers"], + ) + ) + self.conn.commit() + + results = self.cursor.execute( + sql.SQL("SHOW max_parallel_maintenance_workers;") + ).fetchall() + results.extend( + self.cursor.execute(sql.SQL("SHOW max_parallel_workers;")).fetchall() + ) + results.extend( + self.cursor.execute(sql.SQL("SHOW maintenance_work_mem;")).fetchall() + ) + log.info(f"{self.name} parallel index creation parameters: {results}") + def _create_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client create index : {self._index_name}") + + index_param: dict[str, Any] = self.case_config.index_param() + self._set_parallel_index_build_param() + + options = [] + for option_name, option_val in index_param["options"].items(): + if option_val is not None: + options.append( + sql.SQL("{option_name} = {val}").format( + option_name=sql.Identifier(option_name), + val=sql.Identifier(str(option_val)), + ) + ) + + if any(options): + with_clause = sql.SQL("WITH ({});").format(sql.SQL(", ").join(options)) + else: + with_clause = sql.Composed(()) + + index_create_sql = sql.SQL( + """ + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + USING {index_type} (embedding {embedding_metric}) + """ + ).format( + index_name=sql.Identifier(self._index_name), + table_name=sql.Identifier(self.table_name), + index_type=sql.Identifier(index_param["index_type"].lower()), + embedding_metric=sql.Identifier(index_param["metric"]), + ) + index_create_sql_with_with_clause = ( + index_create_sql + with_clause + ).join(" ") + log.debug(index_create_sql_with_with_clause.as_string(self.cursor)) + self.cursor.execute(index_create_sql_with_with_clause) + self.conn.commit() + + def _create_table(self, dim: int): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + log.info(f"{self.name} client create table : {self.table_name}") + + self.cursor.execute( + sql.SQL( + "CREATE TABLE IF NOT EXISTS public.{table_name} (id BIGINT PRIMARY KEY, embedding vector({dim}));" + ).format(table_name=sql.Identifier(self.table_name), dim=dim) + ) + self.cursor.execute( + sql.SQL( + "ALTER TABLE public.{table_name} ALTER COLUMN embedding SET STORAGE PLAIN;" + ).format(table_name=sql.Identifier(self.table_name)) + ) + self.conn.commit() + except Exception as e: + log.warning( + f"Failed to create pgdiskann table: {self.table_name} error: {e}" + ) + raise e from None + + def insert_embeddings( + self, + embeddings: list[list[float]], + metadata: list[int], + **kwargs: Any, + ) -> Tuple[int, Optional[Exception]]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + metadata_arr = np.array(metadata) + embeddings_arr = np.array(embeddings) + + with self.cursor.copy( + sql.SQL("COPY public.{table_name} FROM STDIN (FORMAT BINARY)").format( + table_name=sql.Identifier(self.table_name) + ) + ) as copy: + copy.set_types(["bigint", "vector"]) + for i, row in enumerate(metadata_arr): + copy.write_row((row, embeddings_arr[i])) + self.conn.commit() + + if kwargs.get("last_batch"): + self._post_insert() + + return len(metadata), None + except Exception as e: + log.warning( + f"Failed to insert data into table ({self.table_name}), error: {e}" + ) + return 0, e + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + timeout: int | None = None, + ) -> list[int]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + q = np.asarray(query) + if filters: + gt = filters.get("id") + result = self.cursor.execute( + self._filtered_search, (gt, q, k), prepare=True, binary=True + ) + else: + result = self.cursor.execute( + self._unfiltered_search, (q, k), prepare=True, binary=True + ) + + return [int(i[0]) for i in result.fetchall()] diff --git a/vectordb_bench/backend/clients/pgvector/pgvector.py b/vectordb_bench/backend/clients/pgvector/pgvector.py index 8123acf18..3ac068afc 100644 --- a/vectordb_bench/backend/clients/pgvector/pgvector.py +++ b/vectordb_bench/backend/clients/pgvector/pgvector.py @@ -76,6 +76,38 @@ def __init__( self.cursor = None self.conn = None + def get_size_info(self): + try: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client get size info.") + + size_sql = sql.SQL("SELECT pg_size_pretty(pg_table_size('{table_name}')) as table_size, pg_size_pretty(pg_table_size('{index_name}')) as index_size;").format( + table_name=sql.Identifier(self.table_name), + index_name=sql.Identifier(self._index_name) + ) + log.debug(size_sql.as_string(self.cursor)) + self.cursor.execute(size_sql) + self.conn.commit() + result = self.cursor.fetchone() + + # Parse the results + if result: + table_size = result[0] # First column value + index_size = result[1] + log.info(f"Table Size: {table_size}, Index Size: {index_size}") + return (table_size, index_size) + else: + log.error("No results returned from the query.") + return (0, 0) + except Exception as e: + log.warning( + f"Failed to fetch table and index information" + ) + return (0, 0) + + + @staticmethod def _create_connection(**kwargs) -> Tuple[Connection, Cursor]: conn = psycopg.connect(**kwargs) @@ -306,7 +338,7 @@ def _create_index(self): if index_param["quantization_type"] != None: index_create_sql = sql.SQL( """ - CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} USING {index_type} ((embedding::{quantization_type}({dim})) {embedding_metric}) """ ).format( @@ -321,7 +353,7 @@ def _create_index(self): else: index_create_sql = sql.SQL( """ - CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} USING {index_type} (embedding {embedding_metric}) """ ).format( diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index a6d94f186..54a9cb2da 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -162,15 +162,17 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: if TaskStage.LOAD in self.config.stages: # self._load_train_data() _, load_dur = self._load_train_data() - build_dur = self._optimize() - m.load_duration = round(load_dur + build_dur, 4) + m.build_dur = self._optimize() + m.load_duration = round(load_dur + m.build_dur, 4) log.info( f"Finish loading the entire dataset into VectorDB," - f" insert_duration={load_dur}, optimize_duration={build_dur}" + f" insert_duration={load_dur}, optimize_duration={m.build_dur}" f" load_duration(insert + optimize) = {m.load_duration}" ) else: log.info("Data loading skipped") + with self.db.init(): + m.table_size, m.index_size = self.db.get_size_info() if ( TaskStage.SEARCH_SERIAL in self.config.stages or TaskStage.SEARCH_CONCURRENT in self.config.stages diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index e62c25a3d..4d23ed952 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -1,6 +1,7 @@ from ..backend.clients.pgvector.cli import PgVectorHNSW from ..backend.clients.pgvecto_rs.cli import PgVectoRSHNSW, PgVectoRSIVFFlat from ..backend.clients.pgvectorscale.cli import PgVectorScaleDiskAnn +from ..backend.clients.pgdiskann.cli import PgDiskAnn from ..backend.clients.redis.cli import Redis from ..backend.clients.memorydb.cli import MemoryDB from ..backend.clients.test.cli import Test @@ -22,6 +23,7 @@ cli.add_command(MilvusAutoIndex) cli.add_command(AWSOpenSearch) cli.add_command(PgVectorScaleDiskAnn) +cli.add_command(PgDiskAnn) if __name__ == "__main__": diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 68bf83f19..314ccb5ff 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -180,6 +180,16 @@ class CaseConfigInput(BaseModel): }, ) +CaseConfigParamInput_IndexType_PgDiskANN = CaseConfigInput( + label=CaseConfigParamType.IndexType, + inputHelp="Select Index Type", + inputType=InputType.Option, + inputConfig={ + "options": [ + IndexType.DISKANN.value, + ], + }, +) CaseConfigParamInput_IndexType_PgVectorScale = CaseConfigInput( label=CaseConfigParamType.IndexType, @@ -205,6 +215,42 @@ class CaseConfigInput(BaseModel): }, ) +CaseConfigParamInput_max_neighbors = CaseConfigInput( + label=CaseConfigParamType.max_neighbors, + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 300, + "value": 32, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.DISKANN.value, +) + +CaseConfigParamInput_l_value_ib = CaseConfigInput( + label=CaseConfigParamType.l_value_ib, + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 300, + "value": 50, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.DISKANN.value, +) + +CaseConfigParamInput_l_value_is = CaseConfigInput( + label=CaseConfigParamType.l_value_is, + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 300, + "value": 40, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.DISKANN.value, +) + CaseConfigParamInput_num_neighbors = CaseConfigInput( label=CaseConfigParamType.num_neighbors, inputType=InputType.Number, @@ -942,6 +988,13 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_query_search_list_size, ] +PgDiskANNPerformanceConfig = [ + CaseConfigParamInput_IndexType_PgDiskANN, + CaseConfigParamInput_max_neighbors, + CaseConfigParamInput_l_value_ib, + CaseConfigParamInput_l_value_is, +] + CASE_CONFIG_MAP = { DB.Milvus: { CaseLabel.Load: MilvusLoadConfig, @@ -974,4 +1027,7 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: PgVectorScaleLoadingConfig, CaseLabel.Performance: PgVectorScalePerformanceConfig, }, + DB.PgDiskANN: { + CaseLabel.Performance: PgDiskANNPerformanceConfig, + }, } diff --git a/vectordb_bench/log_util.py b/vectordb_bench/log_util.py index b923bdcd2..3bd305881 100644 --- a/vectordb_bench/log_util.py +++ b/vectordb_bench/log_util.py @@ -17,7 +17,8 @@ def init(log_level): 'handlers': { 'console': { 'class': 'logging.StreamHandler', - 'formatter': 'colorful_console', + #'formatter': 'colorful_console', + 'formatter': 'default', }, 'no_color_console': { 'class': 'logging.StreamHandler', diff --git a/vectordb_bench/metric.py b/vectordb_bench/metric.py index 5c23072e3..25780aafd 100644 --- a/vectordb_bench/metric.py +++ b/vectordb_bench/metric.py @@ -16,6 +16,9 @@ class Metric: # for performance cases load_duration: float = 0.0 # duration to load all dataset into DB + build_dur: float = 0.0 # duration to build the index + table_size: int = 0 + index_size: int = 0 qps: float = 0.0 serial_latency_p99: float = 0.0 recall: float = 0.0 diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index 7968e3e26..89b1af641 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -64,6 +64,9 @@ class CaseConfigParamType(Enum): max_parallel_workers = "max_parallel_workers" storage_layout = "storage_layout" num_neighbors = "num_neighbors" + max_neighbors = "max_neighbors" + l_value_ib = "l_value_ib" + l_value_is = "l_value_is" search_list_size = "search_list_size" max_alpha = "max_alpha" num_dimensions = "num_dimensions" @@ -171,7 +174,8 @@ def flush(self): result_root = config.RESULTS_LOCAL_DIR for db, result in db2case.items(): self.write_db_file( - result_dir=result_root.joinpath(db.value), + result_dir=result_root, + #result_dir=result_root.joinpath(db.value), partial=TestResult( run_id=self.run_id, task_label=self.task_label,