diff --git a/include/Discovery_Schema.h b/include/Discovery_Schema.h index d1ca81eac2..a8d9400df4 100644 --- a/include/Discovery_Schema.h +++ b/include/Discovery_Schema.h @@ -5,6 +5,131 @@ #include #include #include +#include +#include +#include "json.hpp" + +/** + * @brief MCP query rule structure + * + * Action is inferred from rule properties: + * - if error_msg != NULL → block + * - if replace_pattern != NULL → rewrite + * - if timeout_ms > 0 → timeout + * - otherwise → allow + * + * Note: 'hits' is only for in-memory tracking, not persisted to the table. + */ +struct MCP_Query_Rule { + int rule_id; + bool active; + char *username; + char *schemaname; + char *tool_name; + char *match_pattern; + bool negate_match_pattern; + int re_modifiers; // bitmask: 1=CASELESS + int flagIN; + int flagOUT; + char *replace_pattern; + int timeout_ms; + char *error_msg; + char *ok_msg; + bool log; + bool apply; + char *comment; + uint64_t hits; // in-memory only, not persisted to table + void* regex_engine; // compiled regex (RE2) + + MCP_Query_Rule() : rule_id(0), active(false), username(NULL), schemaname(NULL), + tool_name(NULL), match_pattern(NULL), negate_match_pattern(false), + re_modifiers(1), flagIN(0), flagOUT(0), replace_pattern(NULL), + timeout_ms(0), error_msg(NULL), ok_msg(NULL), log(false), apply(true), + comment(NULL), hits(0), regex_engine(NULL) {} +}; + +/** + * @brief MCP query digest statistics + */ +struct MCP_Query_Digest_Stats { + std::string tool_name; + int run_id; + uint64_t digest; + std::string digest_text; + unsigned int count_star; + time_t first_seen; + time_t last_seen; + unsigned long long sum_time; + unsigned long long min_time; + unsigned long long max_time; + + MCP_Query_Digest_Stats() : run_id(-1), digest(0), count_star(0), + first_seen(0), last_seen(0), + sum_time(0), min_time(0), max_time(0) {} + + void add_timing(unsigned long long duration_us, time_t timestamp) { + count_star++; + sum_time += duration_us; + if (duration_us < min_time || min_time == 0) min_time = duration_us; + if (duration_us > max_time) max_time = duration_us; + if (first_seen == 0) first_seen = timestamp; + last_seen = timestamp; + } +}; + +/** + * @brief MCP query processor output + * + * This structure collects all possible actions from matching MCP query rules. + * A single rule can perform multiple actions simultaneously (rewrite + timeout + block). + * Actions are inferred from rule properties: + * - if error_msg != NULL → block + * - if replace_pattern != NULL → rewrite + * - if timeout_ms > 0 → timeout + * - if OK_msg != NULL → return OK message + * + * The calling code checks these fields and performs the appropriate actions. + */ +struct MCP_Query_Processor_Output { + std::string *new_query; // Rewritten query (caller must delete) + int timeout_ms; // Query timeout in milliseconds (-1 = not set) + char *error_msg; // Error message to return (NULL = not set) + char *OK_msg; // OK message to return (NULL = not set) + int log; // Whether to log this query (-1 = not set, 0 = no, 1 = yes) + int next_query_flagIN; // Flag for next query (-1 = not set) + + void init() { + new_query = NULL; + timeout_ms = -1; + error_msg = NULL; + OK_msg = NULL; + log = -1; + next_query_flagIN = -1; + } + + void destroy() { + if (new_query) { + delete new_query; + new_query = NULL; + } + if (error_msg) { + free(error_msg); + error_msg = NULL; + } + if (OK_msg) { + free(OK_msg); + OK_msg = NULL; + } + } + + MCP_Query_Processor_Output() { + init(); + } + + ~MCP_Query_Processor_Output() { + destroy(); + } +}; /** * @brief Two-Phase Discovery Catalog Schema Manager @@ -21,6 +146,15 @@ class Discovery_Schema { SQLite3DB* db; std::string db_path; + // MCP query rules management + std::vector mcp_query_rules; + pthread_rwlock_t mcp_rules_lock; + volatile unsigned int mcp_rules_version; + + // MCP query digest statistics + std::unordered_map> mcp_digest_umap; + pthread_rwlock_t mcp_digest_rwlock; + /** * @brief Initialize catalog schema with all tables * @return 0 on success, -1 on error @@ -679,6 +813,72 @@ class Discovery_Schema { * @return Database file path */ std::string get_db_path() const { return db_path; } + + // ============================================================ + // MCP QUERY RULES + // ============================================================ + + /** + * @brief Load MCP query rules from SQLite + */ + void load_mcp_query_rules(SQLite3_result* resultset); + + /** + * @brief Evaluate MCP query rules for a tool invocation + * @return MCP_Query_Processor_Output object populated with actions from matching rules + * Caller is responsible for destroying the returned object. + */ + MCP_Query_Processor_Output* evaluate_mcp_query_rules( + const std::string& tool_name, + const std::string& schemaname, + const nlohmann::json& arguments, + const std::string& original_query + ); + + /** + * @brief Get current MCP query rules as resultset + */ + SQLite3_result* get_mcp_query_rules(); + + /** + * @brief Get stats for MCP query rules (hits per rule) + */ + SQLite3_result* get_stats_mcp_query_rules(); + + // ============================================================ + // MCP QUERY DIGEST + // ============================================================ + + /** + * @brief Update MCP query digest statistics + */ + void update_mcp_query_digest( + const std::string& tool_name, + int run_id, + uint64_t digest, + const std::string& digest_text, + unsigned long long duration_us, + time_t timestamp + ); + + /** + * @brief Get MCP query digest statistics + * @param reset If true, reset stats after retrieval + */ + SQLite3_result* get_mcp_query_digest(bool reset = false); + + /** + * @brief Compute MCP query digest hash using SpookyHash + */ + static uint64_t compute_mcp_digest( + const std::string& tool_name, + const nlohmann::json& arguments + ); + + /** + * @brief Fingerprint MCP query arguments (replace literals with ?) + */ + static std::string fingerprint_mcp_args(const nlohmann::json& arguments); }; #endif /* CLASS_DISCOVERY_SCHEMA_H */ diff --git a/include/ProxySQL_Admin_Tables_Definitions.h b/include/ProxySQL_Admin_Tables_Definitions.h index bd4d99bc38..451e4b614b 100644 --- a/include/ProxySQL_Admin_Tables_Definitions.h +++ b/include/ProxySQL_Admin_Tables_Definitions.h @@ -325,6 +325,95 @@ #define STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS "CREATE TABLE stats_mcp_query_tools_counters (tool VARCHAR NOT NULL , schema VARCHAR NOT NULL , count INT NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , PRIMARY KEY (tool, schema))" #define STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET "CREATE TABLE stats_mcp_query_tools_counters_reset (tool VARCHAR NOT NULL , schema VARCHAR NOT NULL , count INT NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , PRIMARY KEY (tool, schema))" +// MCP query rules table - for firewall and query rewriting +// Action is inferred from rule properties: +// - if error_msg is not NULL → block +// - if replace_pattern is not NULL → rewrite +// - if timeout_ms > 0 → timeout +// - otherwise → allow +#define ADMIN_SQLITE_TABLE_MCP_QUERY_RULES "CREATE TABLE mcp_query_rules (" \ + " rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL ," \ + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 ," \ + " username VARCHAR ," \ + " schemaname VARCHAR ," \ + " tool_name VARCHAR ," \ + " match_pattern VARCHAR ," \ + " negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 ," \ + " re_modifiers VARCHAR DEFAULT 'CASELESS' ," \ + " flagIN INT NOT NULL DEFAULT 0 ," \ + " flagOUT INT CHECK (flagOUT >= 0) ," \ + " replace_pattern VARCHAR ," \ + " timeout_ms INT CHECK (timeout_ms >= 0) ," \ + " error_msg VARCHAR ," \ + " OK_msg VARCHAR ," \ + " log INT CHECK (log IN (0,1)) ," \ + " apply INT CHECK (apply IN (0,1)) NOT NULL DEFAULT 1 ," \ + " comment VARCHAR" \ + ")" + +// MCP query rules runtime table - shows in-memory state of active rules +// This table has the same schema as mcp_query_rules (no hits column). +// The hits counter is only available in stats_mcp_query_rules table. +// When this table is queried, it is automatically refreshed from the in-memory rules. +#define ADMIN_SQLITE_TABLE_RUNTIME_MCP_QUERY_RULES "CREATE TABLE runtime_mcp_query_rules (" \ + " rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL ," \ + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 ," \ + " username VARCHAR ," \ + " schemaname VARCHAR ," \ + " tool_name VARCHAR ," \ + " match_pattern VARCHAR ," \ + " negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 ," \ + " re_modifiers VARCHAR DEFAULT 'CASELESS' ," \ + " flagIN INT NOT NULL DEFAULT 0 ," \ + " flagOUT INT CHECK (flagOUT >= 0) ," \ + " replace_pattern VARCHAR ," \ + " timeout_ms INT CHECK (timeout_ms >= 0) ," \ + " error_msg VARCHAR ," \ + " OK_msg VARCHAR ," \ + " log INT CHECK (log IN (0,1)) ," \ + " apply INT CHECK (apply IN (0,1)) NOT NULL DEFAULT 1 ," \ + " comment VARCHAR" \ + ")" + +// MCP query digest statistics table +#define STATS_SQLITE_TABLE_MCP_QUERY_DIGEST "CREATE TABLE stats_mcp_query_digest (" \ + " tool_name VARCHAR NOT NULL ," \ + " run_id INT ," \ + " digest VARCHAR NOT NULL ," \ + " digest_text VARCHAR NOT NULL ," \ + " count_star INTEGER NOT NULL ," \ + " first_seen INTEGER NOT NULL ," \ + " last_seen INTEGER NOT NULL ," \ + " sum_time INTEGER NOT NULL ," \ + " min_time INTEGER NOT NULL ," \ + " max_time INTEGER NOT NULL ," \ + " PRIMARY KEY(tool_name, run_id, digest)" \ + ")" + +// MCP query digest reset table +#define STATS_SQLITE_TABLE_MCP_QUERY_DIGEST_RESET "CREATE TABLE stats_mcp_query_digest_reset (" \ + " tool_name VARCHAR NOT NULL ," \ + " run_id INT ," \ + " digest VARCHAR NOT NULL ," \ + " digest_text VARCHAR NOT NULL ," \ + " count_star INTEGER NOT NULL ," \ + " first_seen INTEGER NOT NULL ," \ + " last_seen INTEGER NOT NULL ," \ + " sum_time INTEGER NOT NULL ," \ + " min_time INTEGER NOT NULL ," \ + " max_time INTEGER NOT NULL ," \ + " PRIMARY KEY(tool_name, run_id, digest)" \ + ")" + +// MCP query rules statistics table - shows hit counters for each rule +// This table contains only rule_id and hits count. +// It is automatically populated when stats_mcp_query_rules is queried. +// The hits counter increments each time a rule matches during query processing. +#define STATS_SQLITE_TABLE_MCP_QUERY_RULES "CREATE TABLE stats_mcp_query_rules (" \ + " rule_id INTEGER PRIMARY KEY NOT NULL ," \ + " hits INTEGER NOT NULL" \ + ")" + //#define STATS_SQLITE_TABLE_MEMORY_METRICS "CREATE TABLE stats_memory_metrics (Variable_Name VARCHAR NOT NULL PRIMARY KEY , Variable_Value VARCHAR NOT NULL)" diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 56c2838fe5..92776c4c47 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -643,6 +643,10 @@ class ProxySQL_Admin { void save_mysql_firewall_whitelist_rules_from_runtime(bool, SQLite3_result *); void save_mysql_firewall_whitelist_sqli_fingerprints_from_runtime(bool, SQLite3_result *); + // MCP query rules + char* load_mcp_query_rules_to_runtime(); + void save_mcp_query_rules_from_runtime(bool _runtime = false); + char* load_pgsql_firewall_to_runtime(); void load_scheduler_to_runtime(); @@ -700,6 +704,8 @@ class ProxySQL_Admin { void stats___mysql_gtid_executed(); void stats___mysql_client_host_cache(bool reset); void stats___mcp_query_tools_counters(bool reset); + void stats___mcp_query_digest(bool reset); + void stats___mcp_query_rules(); // Update prometheus metrics void p_stats___memory_metrics(); diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index 60f9458c24..497e4c4de1 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -810,6 +810,12 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_admin, "pgsql_firewall_whitelist_sqli_fingerprints", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_SQLI_FINGERPRINTS); insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_firewall_whitelist_sqli_fingerprints", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_FIREWALL_WHITELIST_SQLI_FINGERPRINTS); + // MCP query rules + insert_into_tables_defs(tables_defs_admin, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES); + insert_into_tables_defs(tables_defs_admin, "runtime_mcp_query_rules", ADMIN_SQLITE_TABLE_RUNTIME_MCP_QUERY_RULES); + + insert_into_tables_defs(tables_defs_config, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES); + insert_into_tables_defs(tables_defs_config, "pgsql_servers", ADMIN_SQLITE_TABLE_PGSQL_SERVERS); insert_into_tables_defs(tables_defs_config, "pgsql_users", ADMIN_SQLITE_TABLE_PGSQL_USERS); insert_into_tables_defs(tables_defs_config, "pgsql_ldap_mapping", ADMIN_SQLITE_TABLE_PGSQL_LDAP_MAPPING); @@ -902,6 +908,11 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS); insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters_reset", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET); + // MCP query digest stats + insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_digest", STATS_SQLITE_TABLE_MCP_QUERY_DIGEST); + insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_digest_reset", STATS_SQLITE_TABLE_MCP_QUERY_DIGEST_RESET); + insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_rules", STATS_SQLITE_TABLE_MCP_QUERY_RULES); // Reuse same schema for stats + // init ldap here init_ldap(); diff --git a/lib/Admin_Handler.cpp b/lib/Admin_Handler.cpp index c46cd797be..2ec9881e20 100644 --- a/lib/Admin_Handler.cpp +++ b/lib/Admin_Handler.cpp @@ -2345,6 +2345,92 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query } } + // ============================================================ + // MCP QUERY RULES COMMAND HANDLERS + // ============================================================ + // Supported commands: + // LOAD MCP QUERY RULES FROM DISK - Copy from disk to memory + // LOAD MCP QUERY RULES TO MEMORY - Copy from disk to memory (alias) + // LOAD MCP QUERY RULES TO RUNTIME - Load from memory to in-memory cache + // LOAD MCP QUERY RULES FROM MEMORY - Load from memory to in-memory cache (alias) + // SAVE MCP QUERY RULES TO DISK - Copy from memory to disk + // SAVE MCP QUERY RULES TO MEMORY - Save from in-memory cache to memory + // SAVE MCP QUERY RULES FROM RUNTIME - Save from in-memory cache to memory (alias) + // ============================================================ + if ((query_no_space_length>20) && ( (!strncasecmp("SAVE MCP QUERY RULES ", query_no_space, 21)) || (!strncasecmp("LOAD MCP QUERY RULES ", query_no_space, 21)) ) ) { + + // LOAD MCP QUERY RULES FROM DISK / TO MEMORY + // Copies rules from persistent storage (disk.mcp_query_rules) to working memory (main.mcp_query_rules) + if ( + (query_no_space_length == strlen("LOAD MCP QUERY RULES FROM DISK") && !strncasecmp("LOAD MCP QUERY RULES FROM DISK", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("LOAD MCP QUERY RULES TO MEMORY") && !strncasecmp("LOAD MCP QUERY RULES TO MEMORY", query_no_space, query_no_space_length)) + ) { + l_free(*ql,*q); + // First clear target table, then insert to ensure deleted source rows are also removed + *q=l_strdup("DELETE FROM main.mcp_query_rules; INSERT OR REPLACE INTO main.mcp_query_rules SELECT * FROM disk.mcp_query_rules"); + *ql=strlen(*q)+1; + return true; + } + + // SAVE MCP QUERY RULES TO DISK + // Copies rules from working memory (main.mcp_query_rules) to persistent storage (disk.mcp_query_rules) + if ( + (query_no_space_length == strlen("SAVE MCP QUERY RULES TO DISK") && !strncasecmp("SAVE MCP QUERY RULES TO DISK", query_no_space, query_no_space_length)) + ) { + l_free(*ql,*q); + // First clear target table, then insert to ensure deleted source rows are also removed + *q=l_strdup("DELETE FROM disk.mcp_query_rules; INSERT OR REPLACE INTO disk.mcp_query_rules SELECT * FROM main.mcp_query_rules"); + *ql=strlen(*q)+1; + return true; + } + + // SAVE MCP QUERY RULES FROM RUNTIME / TO MEMORY + // Saves rules from in-memory cache to working memory (main.mcp_query_rules) + // This persists the currently active rules (with their hit counters) to the database + if ( + (query_no_space_length == strlen("SAVE MCP QUERY RULES TO MEMORY") && !strncasecmp("SAVE MCP QUERY RULES TO MEMORY", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("SAVE MCP QUERY RULES TO MEM") && !strncasecmp("SAVE MCP QUERY RULES TO MEM", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("SAVE MCP QUERY RULES FROM RUNTIME") && !strncasecmp("SAVE MCP QUERY RULES FROM RUNTIME", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("SAVE MCP QUERY RULES FROM RUN") && !strncasecmp("SAVE MCP QUERY RULES FROM RUN", query_no_space, query_no_space_length)) + ) { + proxy_info("Received %s command\n", query_no_space); + ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa; + SPA->save_mcp_query_rules_from_runtime(false); + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Saved mcp query rules from RUNTIME\n"); + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + // LOAD MCP QUERY RULES TO RUNTIME / FROM MEMORY + // Loads rules from working memory (main.mcp_query_rules) to in-memory cache + // This makes the rules active for query processing + if ( + (query_no_space_length == strlen("LOAD MCP QUERY RULES TO RUNTIME") && !strncasecmp("LOAD MCP QUERY RULES TO RUNTIME", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("LOAD MCP QUERY RULES TO RUN") && !strncasecmp("LOAD MCP QUERY RULES TO RUN", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("LOAD MCP QUERY RULES FROM MEMORY") && !strncasecmp("LOAD MCP QUERY RULES FROM MEMORY", query_no_space, query_no_space_length)) + || + (query_no_space_length == strlen("LOAD MCP QUERY RULES FROM MEM") && !strncasecmp("LOAD MCP QUERY RULES FROM MEM", query_no_space, query_no_space_length)) + ) { + proxy_info("Received %s command\n", query_no_space); + ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa; + char* err = SPA->load_mcp_query_rules_to_runtime(); + + if (err==NULL) { + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Loaded mcp query rules to RUNTIME\n"); + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + } else { + SPA->send_error_msg_to_client(sess, err); + } + return false; + } + } + if ((query_no_space_length>21) && ( (!strncasecmp("SAVE ADMIN VARIABLES ", query_no_space, 21)) || (!strncasecmp("LOAD ADMIN VARIABLES ", query_no_space, 21))) ) { if ( is_admin_command_or_alias(LOAD_ADMIN_VARIABLES_TO_MEMORY, query_no_space, query_no_space_length) ) { diff --git a/lib/Discovery_Schema.cpp b/lib/Discovery_Schema.cpp index 140458d4cc..a50f4cab5b 100644 --- a/lib/Discovery_Schema.cpp +++ b/lib/Discovery_Schema.cpp @@ -1,10 +1,12 @@ #include "Discovery_Schema.h" #include "cpp.h" #include "proxysql.h" +#include "re2/re2.h" #include #include #include #include +#include #include "../deps/json/json.hpp" using json = nlohmann::json; @@ -19,12 +21,42 @@ static std::string now_iso() { } Discovery_Schema::Discovery_Schema(const std::string& path) - : db(NULL), db_path(path) + : db(NULL), db_path(path), mcp_rules_version(0) { + pthread_rwlock_init(&mcp_rules_lock, NULL); + pthread_rwlock_init(&mcp_digest_rwlock, NULL); } Discovery_Schema::~Discovery_Schema() { close(); + + // Clean up MCP query rules + for (auto rule : mcp_query_rules) { + if (rule->regex_engine) { + delete (re2::RE2*)rule->regex_engine; + } + free(rule->username); + free(rule->schemaname); + free(rule->tool_name); + free(rule->match_pattern); + free(rule->replace_pattern); + free(rule->error_msg); + free(rule->ok_msg); + free(rule->comment); + delete rule; + } + mcp_query_rules.clear(); + + // Clean up MCP digest statistics + for (auto const& [key1, inner_map] : mcp_digest_umap) { + for (auto const& [key2, stats] : inner_map) { + delete (MCP_Query_Digest_Stats*)stats; + } + } + mcp_digest_umap.clear(); + + pthread_rwlock_destroy(&mcp_rules_lock); + pthread_rwlock_destroy(&mcp_digest_rwlock); } int Discovery_Schema::init() { @@ -311,6 +343,68 @@ int Discovery_Schema::create_deterministic_tables() { "('table:llm_domains', 'Domain Clusters', 'Semantic domain groupings (billing, sales, auth , etc)');" ); + // ============================================================ + // MCP QUERY RULES AND DIGEST TABLES + // ============================================================ + + // MCP query rules table + db->execute( + "CREATE TABLE IF NOT EXISTS mcp_query_rules (" + " rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL ," + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 ," + " tool_name VARCHAR ," + " run_id INT ," + " match_pattern VARCHAR ," + " negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 ," + " re_modifiers VARCHAR DEFAULT 'CASELESS' ," + " flagIN INT NOT NULL DEFAULT 0 ," + " flagOUT INT CHECK (flagOUT >= 0) ," + " action VARCHAR CHECK (action IN ('allow','block','rewrite','timeout')) NOT NULL DEFAULT 'allow' ," + " replace_pattern VARCHAR ," + " timeout_ms INT CHECK (timeout_ms >= 0) ," + " error_msg VARCHAR ," + " OK_msg VARCHAR ," + " log INT CHECK (log IN (0,1)) ," + " apply INT CHECK (apply IN (0,1)) NOT NULL DEFAULT 1 ," + " comment VARCHAR ," + " hits INTEGER NOT NULL DEFAULT 0" + ");" + ); + + // MCP query digest statistics table + db->execute( + "CREATE TABLE IF NOT EXISTS stats_mcp_query_digest (" + " tool_name VARCHAR NOT NULL ," + " run_id INT ," + " digest VARCHAR NOT NULL ," + " digest_text VARCHAR NOT NULL ," + " count_star INTEGER NOT NULL ," + " first_seen INTEGER NOT NULL ," + " last_seen INTEGER NOT NULL ," + " sum_time INTEGER NOT NULL ," + " min_time INTEGER NOT NULL ," + " max_time INTEGER NOT NULL ," + " PRIMARY KEY(tool_name, run_id, digest)" + ");" + ); + + // MCP query digest reset table + db->execute( + "CREATE TABLE IF NOT EXISTS stats_mcp_query_digest_reset (" + " tool_name VARCHAR NOT NULL ," + " run_id INT ," + " digest VARCHAR NOT NULL ," + " digest_text VARCHAR NOT NULL ," + " count_star INTEGER NOT NULL ," + " first_seen INTEGER NOT NULL ," + " last_seen INTEGER NOT NULL ," + " sum_time INTEGER NOT NULL ," + " min_time INTEGER NOT NULL ," + " max_time INTEGER NOT NULL ," + " PRIMARY KEY(tool_name, run_id, digest)" + ");" + ); + return 0; } @@ -2241,3 +2335,705 @@ int Discovery_Schema::log_query_tool_call( return 0; } + +// ============================================================ +// MCP QUERY RULES +// ============================================================ +// Load MCP query rules from database into memory +// +// This function replaces all in-memory MCP query rules with the rules +// from the provided resultset. It compiles regex patterns for each rule +// and initializes all rule properties. +// +// Args: +// resultset: SQLite result set containing rule definitions from the database +// Must contain 17 columns in the correct order: +// rule_id, active, username, schemaname, tool_name, match_pattern, +// negate_match_pattern, re_modifiers, flagIN, flagOUT, replace_pattern, +// timeout_ms, error_msg, OK_msg, log, apply, comment +// +// Thread Safety: +// Uses write lock on mcp_rules_lock during update +// +// Side Effects: +// - Increments mcp_rules_version (triggers runtime cache invalidation) +// - Clears and rebuilds mcp_query_rules vector +// - Compiles regex engines for all match_pattern fields +// ============================================================ + +void Discovery_Schema::load_mcp_query_rules(SQLite3_result* resultset) { + if (!resultset || resultset->rows_count == 0) { + proxy_info("No MCP query rules to load\n"); + return; + } + + pthread_rwlock_wrlock(&mcp_rules_lock); + + // Clear existing rules + for (auto rule : mcp_query_rules) { + if (rule->regex_engine) { + delete (re2::RE2*)rule->regex_engine; + } + free(rule->username); + free(rule->schemaname); + free(rule->tool_name); + free(rule->match_pattern); + free(rule->replace_pattern); + free(rule->error_msg); + free(rule->ok_msg); + free(rule->comment); + delete rule; + } + mcp_query_rules.clear(); + + // Load new rules from resultset + // Column order: rule_id, active, username, schemaname, tool_name, match_pattern, + // negate_match_pattern, re_modifiers, flagIN, flagOUT, replace_pattern, + // timeout_ms, error_msg, OK_msg, log, apply, comment + // Expected: 17 columns (fields[0] through fields[16]) + for (unsigned int i = 0; i < resultset->rows_count; i++) { + SQLite3_row* row = resultset->rows[i]; + + // Validate column count before accessing fields + if (row->cnt < 17) { + proxy_error("Invalid row format in mcp_query_rules: expected 17 columns, got %d. Skipping row %u.\n", + row->cnt, i); + continue; + } + + MCP_Query_Rule* rule = new MCP_Query_Rule(); + + rule->rule_id = atoi(row->fields[0]); // rule_id + rule->active = atoi(row->fields[1]) != 0; // active + rule->username = row->fields[2] ? strdup(row->fields[2]) : NULL; // username + rule->schemaname = row->fields[3] ? strdup(row->fields[3]) : NULL; // schemaname + rule->tool_name = row->fields[4] ? strdup(row->fields[4]) : NULL; // tool_name + rule->match_pattern = row->fields[5] ? strdup(row->fields[5]) : NULL; // match_pattern + rule->negate_match_pattern = row->fields[6] ? atoi(row->fields[6]) != 0 : false; // negate_match_pattern + // re_modifiers: Parse VARCHAR value - "CASELESS" maps to 1, otherwise parse as int + if (row->fields[7]) { + std::string mod = row->fields[7]; + if (mod == "CASELESS") { + rule->re_modifiers = 1; + } else if (mod == "0") { + rule->re_modifiers = 0; + } else { + rule->re_modifiers = atoi(mod.c_str()); + } + } else { + rule->re_modifiers = 1; // default CASELESS + } + rule->flagIN = row->fields[8] ? atoi(row->fields[8]) : 0; // flagIN + rule->flagOUT = row->fields[9] ? atoi(row->fields[9]) : 0; // flagOUT + rule->replace_pattern = row->fields[10] ? strdup(row->fields[10]) : NULL; // replace_pattern + rule->timeout_ms = row->fields[11] ? atoi(row->fields[11]) : 0; // timeout_ms + rule->error_msg = row->fields[12] ? strdup(row->fields[12]) : NULL; // error_msg + rule->ok_msg = row->fields[13] ? strdup(row->fields[13]) : NULL; // OK_msg + rule->log = row->fields[14] ? atoi(row->fields[14]) != 0 : false; // log + rule->apply = row->fields[15] ? atoi(row->fields[15]) != 0 : true; // apply + rule->comment = row->fields[16] ? strdup(row->fields[16]) : NULL; // comment + // Note: hits is in-memory only, not loaded from table + + // Compile regex if match_pattern exists + if (rule->match_pattern) { + re2::RE2::Options opts; + opts.set_log_errors(false); + if (rule->re_modifiers & 1) { + opts.set_case_sensitive(false); + } + rule->regex_engine = new re2::RE2(rule->match_pattern, opts); + if (!((re2::RE2*)rule->regex_engine)->ok()) { + proxy_warning("Failed to compile regex for MCP rule %d: %s\n", + rule->rule_id, rule->match_pattern); + delete (re2::RE2*)rule->regex_engine; + rule->regex_engine = NULL; + } + } + + mcp_query_rules.push_back(rule); + } + + mcp_rules_version++; + pthread_rwlock_unlock(&mcp_rules_lock); + + proxy_info("Loaded %zu MCP query rules\n", mcp_query_rules.size()); +} + +// Evaluate MCP query rules against an incoming query +// +// This function processes the query through all active MCP query rules in order, +// applying matching rules and collecting their actions. Multiple actions from +// different rules can be combined. +// +// Rule Actions (not mutually exclusive): +// - error_msg: Block the query with the specified error message +// - replace_pattern: Rewrite the query using regex substitution +// - timeout_ms: Set a timeout for query execution +// - OK_msg: Return success immediately with the specified message +// - log: Enable logging for this query +// +// Rule Processing Flow: +// 1. Skip inactive rules +// 2. Check flagIN match +// 3. Check username match (currently skipped as username not available in MCP context) +// 4. Check schemaname match +// 5. Check tool_name match +// 6. Check match_pattern against the query (regex) +// 7. If match: increment hits, apply actions, set flagOUT, and stop if apply=true +// +// Args: +// tool_name: The name of the MCP tool being called +// schemaname: The schema/database context for the query +// arguments: The JSON arguments passed to the tool +// original_query: The original SQL query string +// +// Returns: +// MCP_Query_Processor_Output*: Output object containing all actions to apply +// - error_msg: If set, query should be blocked +// - OK_msg: If set, return success immediately +// - new_query: Rewritten query if replace_pattern was applied +// - timeout_ms: Timeout in milliseconds if set +// - log: Whether to log this query +// - next_query_flagIN: The flagOUT value for chaining rules +// +// Thread Safety: +// Uses read lock on mcp_rules_lock during evaluation +// +// Memory Ownership: +// Returns a newly allocated MCP_Query_Processor_Output object. +// The caller assumes ownership and MUST delete the returned pointer +// when done to avoid memory leaks. +// +MCP_Query_Processor_Output* Discovery_Schema::evaluate_mcp_query_rules( + const std::string& tool_name, + const std::string& schemaname, + const nlohmann::json& arguments, + const std::string& original_query +) { + MCP_Query_Processor_Output* qpo = new MCP_Query_Processor_Output(); + qpo->init(); + + std::string current_query = original_query; + int current_flag = 0; + + pthread_rwlock_rdlock(&mcp_rules_lock); + + for (auto rule : mcp_query_rules) { + // Skip inactive rules + if (!rule->active) continue; + + // Check flagIN + if (rule->flagIN != current_flag) continue; + + // Check username match + if (rule->username) { + // For now, we don't have username in MCP context, skip if set + // TODO: Add username matching when available + continue; + } + + // Check schemaname match + if (rule->schemaname) { + if (!schemaname.empty() && strcmp(rule->schemaname, schemaname.c_str()) != 0) { + continue; + } + } + + // Check tool_name match + if (rule->tool_name) { + if (strcmp(rule->tool_name, tool_name.c_str()) != 0) continue; + } + + // Check match_pattern against the query + bool matches = false; + if (rule->regex_engine && rule->match_pattern) { + re2::RE2* regex = (re2::RE2*)rule->regex_engine; + re2::StringPiece piece(current_query); + matches = re2::RE2::PartialMatch(piece, *regex); + if (rule->negate_match_pattern) { + matches = !matches; + } + } else { + // No pattern means match all + matches = true; + } + + if (matches) { + // Increment hit counter + __sync_add_and_fetch((unsigned long long*)&rule->hits, 1); + + // Collect rule actions in output object + if (!rule->apply) { + // Log-only rule, continue processing + if (rule->log) { + proxy_info("MCP query rule %d logged: tool=%s schema=%s\n", + rule->rule_id, tool_name.c_str(), schemaname.c_str()); + } + if (qpo->log == -1) { + qpo->log = rule->log ? 1 : 0; + } + continue; + } + + // Set flagOUT for next rules + if (rule->flagOUT >= 0) { + current_flag = rule->flagOUT; + } + + // Collect all actions from this rule in the output object + // Actions are NOT mutually exclusive - a single rule can: + // rewrite + timeout + block all at once + + // 1. Rewrite action (if replace_pattern is set) + if (rule->replace_pattern && rule->regex_engine) { + std::string rewritten = current_query; + if (re2::RE2::Replace(&rewritten, *(re2::RE2*)rule->regex_engine, rule->replace_pattern)) { + // Update current_query for subsequent rule matching + current_query = rewritten; + // Store in output object + if (qpo->new_query) { + delete qpo->new_query; + } + qpo->new_query = new std::string(rewritten); + } + } + + // 2. Timeout action (if timeout_ms > 0) + if (rule->timeout_ms > 0) { + qpo->timeout_ms = rule->timeout_ms; + } + + // 3. Error message (block action) + if (rule->error_msg) { + if (qpo->error_msg) { + free(qpo->error_msg); + } + qpo->error_msg = strdup(rule->error_msg); + } + + // 4. OK message (allow with response) + if (rule->ok_msg) { + if (qpo->OK_msg) { + free(qpo->OK_msg); + } + qpo->OK_msg = strdup(rule->ok_msg); + } + + // 5. Log flag + if (rule->log && qpo->log == -1) { + qpo->log = 1; + } + + // 6. next_query_flagIN + if (rule->flagOUT >= 0) { + qpo->next_query_flagIN = rule->flagOUT; + } + + // If apply is true and not a log-only rule, stop processing further rules + if (rule->apply) { + break; + } + } + } + + pthread_rwlock_unlock(&mcp_rules_lock); + return qpo; +} + +// Get all MCP query rules from memory +// +// Returns all MCP query rules currently loaded in memory. +// This is used to populate both mcp_query_rules and runtime_mcp_query_rules tables. +// Note: The hits counter is NOT included (use get_stats_mcp_query_rules() for that). +// +// Returns: +// SQLite3_result*: Result set with 17 columns (no hits column) +// +// Thread Safety: +// Uses read lock on mcp_rules_lock +// +SQLite3_result* Discovery_Schema::get_mcp_query_rules() { + SQLite3_result* result = new SQLite3_result(); + + // Define columns (17 columns - same for mcp_query_rules and runtime_mcp_query_rules) + result->add_column_definition(SQLITE_TEXT, "rule_id"); + result->add_column_definition(SQLITE_TEXT, "active"); + result->add_column_definition(SQLITE_TEXT, "username"); + result->add_column_definition(SQLITE_TEXT, "schemaname"); + result->add_column_definition(SQLITE_TEXT, "tool_name"); + result->add_column_definition(SQLITE_TEXT, "match_pattern"); + result->add_column_definition(SQLITE_TEXT, "negate_match_pattern"); + result->add_column_definition(SQLITE_TEXT, "re_modifiers"); + result->add_column_definition(SQLITE_TEXT, "flagIN"); + result->add_column_definition(SQLITE_TEXT, "flagOUT"); + result->add_column_definition(SQLITE_TEXT, "replace_pattern"); + result->add_column_definition(SQLITE_TEXT, "timeout_ms"); + result->add_column_definition(SQLITE_TEXT, "error_msg"); + result->add_column_definition(SQLITE_TEXT, "OK_msg"); + result->add_column_definition(SQLITE_TEXT, "log"); + result->add_column_definition(SQLITE_TEXT, "apply"); + result->add_column_definition(SQLITE_TEXT, "comment"); + + pthread_rwlock_rdlock(&mcp_rules_lock); + + for (size_t i = 0; i < mcp_query_rules.size(); i++) { + MCP_Query_Rule* rule = mcp_query_rules[i]; + char** pta = (char**)malloc(sizeof(char*) * 17); + + pta[0] = strdup(std::to_string(rule->rule_id).c_str()); // rule_id + pta[1] = strdup(std::to_string(rule->active ? 1 : 0).c_str()); // active + pta[2] = rule->username ? strdup(rule->username) : NULL; // username + pta[3] = rule->schemaname ? strdup(rule->schemaname) : NULL; // schemaname + pta[4] = rule->tool_name ? strdup(rule->tool_name) : NULL; // tool_name + pta[5] = rule->match_pattern ? strdup(rule->match_pattern) : NULL; // match_pattern + pta[6] = strdup(std::to_string(rule->negate_match_pattern ? 1 : 0).c_str()); // negate_match_pattern + pta[7] = strdup(std::to_string(rule->re_modifiers).c_str()); // re_modifiers + pta[8] = strdup(std::to_string(rule->flagIN).c_str()); // flagIN + pta[9] = strdup(std::to_string(rule->flagOUT).c_str()); // flagOUT + pta[10] = rule->replace_pattern ? strdup(rule->replace_pattern) : NULL; // replace_pattern + pta[11] = strdup(std::to_string(rule->timeout_ms).c_str()); // timeout_ms + pta[12] = rule->error_msg ? strdup(rule->error_msg) : NULL; // error_msg + pta[13] = rule->ok_msg ? strdup(rule->ok_msg) : NULL; // OK_msg + pta[14] = strdup(std::to_string(rule->log ? 1 : 0).c_str()); // log + pta[15] = strdup(std::to_string(rule->apply ? 1 : 0).c_str()); // apply + pta[16] = rule->comment ? strdup(rule->comment) : NULL; // comment + + result->add_row(pta); + + // Free the row data + for (int j = 0; j < 17; j++) { + if (pta[j]) { + free(pta[j]); + } + } + free(pta); + } + + pthread_rwlock_unlock(&mcp_rules_lock); + return result; +} + +// Get MCP query rules statistics (hit counters) +// +// Returns the hit counter for each MCP query rule. +// The hit counter increments each time a rule matches during query processing. +// This is used to populate the stats_mcp_query_rules table. +// +// Returns: +// SQLite3_result*: Result set with 2 columns (rule_id, hits) +// +// Thread Safety: +// Uses read lock on mcp_rules_lock +// +SQLite3_result* Discovery_Schema::get_stats_mcp_query_rules() { + SQLite3_result* result = new SQLite3_result(); + + // Define columns + result->add_column_definition(SQLITE_TEXT, "rule_id"); + result->add_column_definition(SQLITE_TEXT, "hits"); + + pthread_rwlock_rdlock(&mcp_rules_lock); + + for (size_t i = 0; i < mcp_query_rules.size(); i++) { + MCP_Query_Rule* rule = mcp_query_rules[i]; + char** pta = (char**)malloc(sizeof(char*) * 2); + + pta[0] = strdup(std::to_string(rule->rule_id).c_str()); + pta[1] = strdup(std::to_string(rule->hits).c_str()); + + result->add_row(pta); + + // Free the row data + for (int j = 0; j < 2; j++) { + if (pta[j]) { + free(pta[j]); + } + } + free(pta); + } + + pthread_rwlock_unlock(&mcp_rules_lock); + return result; +} + +// ============================================================ +// MCP QUERY DIGEST +// ============================================================ + +// Update MCP query digest statistics after a tool call completes. +// +// This function is called after each successful MCP tool execution to +// record performance and frequency statistics. Similar to MySQL's query +// digest tracking, this aggregates statistics for "similar" queries +// (queries with the same fingerprinted structure). +// +// Parameters: +// tool_name - Name of the MCP tool that was called (e.g., "run_sql_readonly") +// run_id - Discovery run identifier (0 if no schema context) +// digest - Computed digest hash (lower 64 bits of SpookyHash) +// digest_text - Fingerprinted JSON arguments with literals replaced by '?' +// duration_us - Query execution time in microseconds +// timestamp - Unix timestamp of when the query completed +// +// Statistics Updated: +// - count_star: Incremented for each execution +// - sum_time: Accumulates total execution time +// - min_time: Tracks minimum execution time +// - max_time: Tracks maximum execution time +// - first_seen: Set once on first occurrence (not updated) +// - last_seen: Updated to current timestamp on each execution +// +// Thread Safety: +// Acquires write lock on mcp_digest_rwlock for the entire operation. +// Nested map structure: mcp_digest_umap["tool_name|run_id"][digest] +// +// Note: Digest statistics are currently kept in memory only. Persistence +// to SQLite is planned (TODO at line 2775). +void Discovery_Schema::update_mcp_query_digest( + const std::string& tool_name, + int run_id, + uint64_t digest, + const std::string& digest_text, + unsigned long long duration_us, + time_t timestamp +) { + // Create composite key: tool_name + run_id + std::string key = tool_name + "|" + std::to_string(run_id); + + pthread_rwlock_wrlock(&mcp_digest_rwlock); + + // Find or create digest stats entry + auto& tool_map = mcp_digest_umap[key]; + auto it = tool_map.find(digest); + + MCP_Query_Digest_Stats* stats = NULL; + if (it != tool_map.end()) { + stats = (MCP_Query_Digest_Stats*)it->second; + } else { + stats = new MCP_Query_Digest_Stats(); + stats->tool_name = tool_name; + stats->run_id = run_id; + stats->digest = digest; + stats->digest_text = digest_text; + tool_map[digest] = stats; + } + + // Update statistics + stats->add_timing(duration_us, timestamp); + + pthread_rwlock_unlock(&mcp_digest_rwlock); + + // Periodically persist to SQLite (every 100 updates or so) + static thread_local unsigned int update_count = 0; + if (++update_count % 100 == 0) { + // TODO: Implement batch persistence + } +} + +// Get MCP query digest statistics from the in-memory digest map. +// +// Returns all accumulated digest statistics for MCP tool calls that have been +// processed. This includes execution counts, timing information, and the +// fingerprinted query text. +// +// Parameters: +// reset - If true, clears all in-memory digest statistics after returning them. +// This is used for the stats_mcp_query_digest_reset table. +// If false, statistics remain in memory (stats_mcp_query_digest table). +// +// Returns: +// SQLite3_result* - Result set containing digest statistics with columns: +// - tool_name: Name of the MCP tool that was called +// - run_id: Discovery run identifier +// - digest: 128-bit hash (lower 64 bits) identifying the query fingerprint +// - digest_text: Fingerprinted JSON with literals replaced by '?' +// - count_star: Number of times this digest was seen +// - first_seen: Unix timestamp of first occurrence +// - last_seen: Unix timestamp of most recent occurrence +// - sum_time: Total execution time in microseconds +// - min_time: Minimum execution time in microseconds +// - max_time: Maximum execution time in microseconds +// +// Thread Safety: +// Uses read-write lock (mcp_digest_rwlock) for concurrent access. +// Reset operation acquires write lock to clear the digest map. +// +// Note: The caller is responsible for freeing the returned SQLite3_result. +SQLite3_result* Discovery_Schema::get_mcp_query_digest(bool reset) { + SQLite3_result* result = new SQLite3_result(); + + // Define columns for MCP query digest statistics + result->add_column_definition(SQLITE_TEXT, "tool_name"); + result->add_column_definition(SQLITE_TEXT, "run_id"); + result->add_column_definition(SQLITE_TEXT, "digest"); + result->add_column_definition(SQLITE_TEXT, "digest_text"); + result->add_column_definition(SQLITE_TEXT, "count_star"); + result->add_column_definition(SQLITE_TEXT, "first_seen"); + result->add_column_definition(SQLITE_TEXT, "last_seen"); + result->add_column_definition(SQLITE_TEXT, "sum_time"); + result->add_column_definition(SQLITE_TEXT, "min_time"); + result->add_column_definition(SQLITE_TEXT, "max_time"); + + // Use appropriate lock based on reset flag to prevent TOCTOU race condition + // If reset is true, we need a write lock from the start to prevent new data + // from being added between the read and write lock operations + if (reset) { + pthread_rwlock_wrlock(&mcp_digest_rwlock); + } else { + pthread_rwlock_rdlock(&mcp_digest_rwlock); + } + + for (auto const& [key1, inner_map] : mcp_digest_umap) { + for (auto const& [digest, stats_ptr] : inner_map) { + MCP_Query_Digest_Stats* stats = (MCP_Query_Digest_Stats*)stats_ptr; + char** pta = (char**)malloc(sizeof(char*) * 10); + + pta[0] = strdup(stats->tool_name.c_str()); // tool_name + pta[1] = strdup(std::to_string(stats->run_id).c_str()); // run_id + pta[2] = strdup(std::to_string(stats->digest).c_str()); // digest + pta[3] = strdup(stats->digest_text.c_str()); // digest_text + pta[4] = strdup(std::to_string(stats->count_star).c_str()); // count_star + pta[5] = strdup(std::to_string(stats->first_seen).c_str()); // first_seen + pta[6] = strdup(std::to_string(stats->last_seen).c_str()); // last_seen + pta[7] = strdup(std::to_string(stats->sum_time).c_str()); // sum_time + pta[8] = strdup(std::to_string(stats->min_time).c_str()); // min_time + pta[9] = strdup(std::to_string(stats->max_time).c_str()); // max_time + + result->add_row(pta); + + // Free the row data + for (int j = 0; j < 10; j++) { + if (pta[j]) { + free(pta[j]); + } + } + free(pta); + } + } + + if (reset) { + // Clear all digest stats (we already have write lock) + for (auto const& [key1, inner_map] : mcp_digest_umap) { + for (auto const& [key2, stats] : inner_map) { + delete (MCP_Query_Digest_Stats*)stats; + } + } + mcp_digest_umap.clear(); + } + + pthread_rwlock_unlock(&mcp_digest_rwlock); + + return result; +} + +// Compute a unique digest hash for an MCP tool call. +// +// Creates a deterministic hash value that identifies similar MCP queries +// by normalizing the arguments (fingerprinting) and hashing the result. +// Queries with the same tool name and argument structure (but different +// literal values) will produce the same digest. +// +// This is analogous to MySQL query digest computation, which fingerprints +// SQL queries by replacing literal values with placeholders. +// +// Parameters: +// tool_name - Name of the MCP tool being called (e.g., "run_sql_readonly") +// arguments - JSON object containing the tool's arguments +// +// Returns: +// uint64_t - Lower 64 bits of the 128-bit SpookyHash digest value +// +// Digest Computation: +// 1. Arguments are fingerprinted (literals replaced with '?' placeholders) +// 2. Tool name and fingerprint are combined: "tool_name:{fingerprint}" +// 3. SpookyHash 128-bit hash is computed on the combined string +// 4. Lower 64 bits (hash1) are returned as the digest +// +// Example: +// Input: tool_name="run_sql_readonly", arguments={"sql": "SELECT * FROM users WHERE id = 123"} +// Fingerprint: {"sql":"?"} +// Combined: "run_sql_readonly:{"sql":"?"}" +// Digest: (uint64_t hash value) +// +// Note: Uses SpookyHash for fast, non-cryptographic hashing with good +// distribution properties. The same algorithm is used for MySQL query digests. +uint64_t Discovery_Schema::compute_mcp_digest( + const std::string& tool_name, + const nlohmann::json& arguments +) { + std::string fingerprint = fingerprint_mcp_args(arguments); + + // Combine tool_name and fingerprint for hashing + std::string combined = tool_name + ":" + fingerprint; + + // Use SpookyHash to compute digest + uint64_t hash1, hash2; + SpookyHash::Hash128(combined.data(), combined.length(), &hash1, &hash2); + + return hash1; +} + +// Generate a fingerprint of MCP tool arguments by replacing literals with placeholders. +// +// Converts a JSON arguments structure into a normalized form where all +// literal values (strings, numbers, booleans) are replaced with '?' placeholders. +// This allows similar queries to be grouped together for statistics and analysis. +// +// Parameters: +// arguments - JSON object/array containing the tool's arguments +// +// Returns: +// std::string - Fingerprinted JSON string with literals replaced by '?' +// +// Fingerprinting Rules: +// - String values: replaced with "?" +// - Number values: replaced with "?" +// - Boolean values: replaced with "?" +// - Objects: recursively fingerprinted (keys preserved, values replaced) +// - Arrays: replaced with "[?]" (entire array is a placeholder) +// - Null values: preserved as "null" +// +// Example: +// Input: {"sql": "SELECT * FROM users WHERE id = 123", "timeout": 5000} +// Output: {"sql":"?","timeout":"?"} +// +// Input: {"filters": {"status": "active", "age": 25}} +// Output: {"filters":{"?":"?","?":"?"}} +// +// Note: Object keys (field names) are preserved as-is, only values are replaced. +// This ensures that queries with different parameter structures produce different +// fingerprints, while queries with the same structure but different values produce +// the same fingerprint. +std::string Discovery_Schema::fingerprint_mcp_args(const nlohmann::json& arguments) { + // Serialize JSON with literals replaced by placeholders + std::string result; + + if (arguments.is_object()) { + result += "{"; + bool first = true; + for (auto it = arguments.begin(); it != arguments.end(); ++it) { + if (!first) result += ","; + first = false; + result += "\"" + it.key() + "\":"; + + if (it.value().is_string()) { + result += "\"?\""; + } else if (it.value().is_number() || it.value().is_boolean()) { + result += "?"; + } else if (it.value().is_object()) { + result += fingerprint_mcp_args(it.value()); + } else if (it.value().is_array()) { + result += "[?]"; + } else { + result += "null"; + } + } + result += "}"; + } else if (arguments.is_array()) { + result += "[?]"; + } else { + result += "?"; + } + + return result; +} \ No newline at end of file diff --git a/lib/MySQL_Catalog.cpp b/lib/MySQL_Catalog.cpp index e11d21fc43..206c9623f5 100644 --- a/lib/MySQL_Catalog.cpp +++ b/lib/MySQL_Catalog.cpp @@ -1,3 +1,24 @@ +// ============================================================ +// MySQL Catalog Implementation +// +// The MySQL Catalog provides a SQLite-based key-value store for +// MCP tool results, with schema isolation for multi-tenancy. +// +// Schema Isolation: +// All catalog entries are now scoped to a specific schema (database). +// The catalog table has a composite unique constraint on (schema, kind, key) +// to ensure entries from different schemas don't conflict. +// +// Functions accept a schema parameter to scope operations: +// - upsert(schema, kind, key, document, tags, links) +// - get(schema, kind, key, document) +// - search(schema, query, kind, tags, limit, offset) +// - list(schema, kind, limit, offset) +// - remove(schema, kind, key) +// +// Use empty schema "" for global/shared entries. +// ============================================================ + #include "MySQL_Catalog.h" #include "cpp.h" #include "proxysql.h" @@ -5,6 +26,10 @@ #include #include "../deps/json/json.hpp" +// ============================================================ +// Constructor / Destructor +// ============================================================ + MySQL_Catalog::MySQL_Catalog(const std::string& path) : db(NULL), db_path(path) { @@ -14,6 +39,17 @@ MySQL_Catalog::~MySQL_Catalog() { close(); } +// ============================================================ +// Database Initialization +// ============================================================ + +// Initialize the catalog database connection and schema. +// +// Opens (or creates) the SQLite database at db_path and initializes +// the catalog table with schema isolation support. +// +// Returns: +// 0 on success, -1 on error int MySQL_Catalog::init() { // Initialize database connection db = new SQLite3DB(); @@ -29,6 +65,7 @@ int MySQL_Catalog::init() { return init_schema(); } +// Close the catalog database connection. void MySQL_Catalog::close() { if (db) { delete db; @@ -112,6 +149,26 @@ int MySQL_Catalog::create_tables() { return 0; } +// ============================================================ +// Catalog CRUD Operations +// ============================================================ + +// Insert or update a catalog entry with schema isolation. +// +// Uses INSERT OR REPLACE (UPSERT) semantics with schema scoping. +// The unique constraint is (schema, kind, key), so entries from +// different schemas won't conflict even if they have the same kind/key. +// +// Parameters: +// schema - Schema name for isolation (use "" for global entries) +// kind - Entry kind (table, view, domain, metric, note, etc.) +// key - Unique key within the schema/kind +// document - JSON document content +// tags - Comma-separated tags +// links - Comma-separated related keys +// +// Returns: +// 0 on success, -1 on error int MySQL_Catalog::upsert( const std::string& schema, const std::string& kind, @@ -151,6 +208,16 @@ int MySQL_Catalog::upsert( return 0; } +// Retrieve a catalog entry by schema, kind, and key. +// +// Parameters: +// schema - Schema name for isolation +// kind - Entry kind +// key - Unique key +// document - Output: JSON document content +// +// Returns: +// 0 on success (entry found), -1 on error or not found int MySQL_Catalog::get( const std::string& schema, const std::string& kind, @@ -188,6 +255,18 @@ int MySQL_Catalog::get( return -1; } +// Search catalog entries with optional filters. +// +// Parameters: +// schema - Schema filter (empty string for all schemas) +// query - Full-text search query (matches key, document, tags) +// kind - Kind filter (empty string for all kinds) +// tags - Tag filter (partial match) +// limit - Maximum results to return +// offset - Results offset for pagination +// +// Returns: +// JSON array of matching entries with schema, kind, key, document, tags, links std::string MySQL_Catalog::search( const std::string& schema, const std::string& query, @@ -270,6 +349,17 @@ std::string MySQL_Catalog::search( return results.dump(); } +// List catalog entries with optional filters and pagination. +// +// Parameters: +// schema - Schema filter (empty string for all schemas) +// kind - Kind filter (empty string for all kinds) +// limit - Maximum results to return +// offset - Results offset for pagination +// +// Returns: +// JSON object with "total" count and "results" array containing +// entries with schema, kind, key, document, tags, links std::string MySQL_Catalog::list( const std::string& schema, const std::string& kind, @@ -352,6 +442,20 @@ std::string MySQL_Catalog::list( return result.dump(); } +// Merge multiple catalog entries into a single target entry. +// +// Fetches documents for the source keys and creates a merged document +// with source_keys and instructions fields. Uses empty schema for +// merged domain entries (backward compatibility). +// +// Parameters: +// keys - Vector of source keys to merge +// target_key - Key for the merged entry +// kind - Kind for the merged entry (e.g., "domain") +// instructions - Optional instructions for the merge +// +// Returns: +// 0 on success, -1 on error int MySQL_Catalog::merge( const std::vector& keys, const std::string& target_key, @@ -384,6 +488,15 @@ int MySQL_Catalog::merge( return upsert("", kind, target_key, merged_doc , "" , ""); } +// Delete a catalog entry by schema, kind, and key. +// +// Parameters: +// schema - Schema filter (empty string for all schemas) +// kind - Entry kind +// key - Unique key +// +// Returns: +// 0 on success, -1 on error int MySQL_Catalog::remove( const std::string& schema, const std::string& kind, diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 15cc4fddc8..2de36105ce 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -20,6 +20,8 @@ using json = nlohmann::json; #include "PgSQL_HostGroups_Manager.h" #include "mysql.h" #include "proxysql_admin.h" +#include "Discovery_Schema.h" +#include "Query_Tool_Handler.h" #include "re2/re2.h" #include "re2/regexp.h" #include "proxysql.h" @@ -1155,6 +1157,9 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool stats_pgsql_commands_counters = false; bool stats_mcp_query_tools_counters = false; bool stats_mcp_query_tools_counters_reset = false; + bool stats_mcp_query_digest = false; + bool stats_mcp_query_digest_reset = false; + bool stats_mcp_query_rules = false; bool stats_mysql_query_rules=false; bool stats_pgsql_query_rules = false; bool stats_mysql_users=false; @@ -1182,6 +1187,8 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool runtime_pgsql_query_rules = false; bool runtime_pgsql_query_rules_fast_routing = false; + bool runtime_mcp_query_rules = false; + bool stats_pgsql_global = false; bool stats_pgsql_connection_pool = false; bool stats_pgsql_connection_pool_reset = false; @@ -1348,6 +1355,12 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign { stats_mcp_query_tools_counters=true; refresh=true; } if (strstr(query_no_space,"stats_mcp_query_tools_counters_reset")) { stats_mcp_query_tools_counters_reset=true; refresh=true; } + if (strstr(query_no_space,"stats_mcp_query_digest_reset")) + { stats_mcp_query_digest_reset=true; refresh=true; } + else if (strstr(query_no_space,"stats_mcp_query_digest")) + { stats_mcp_query_digest=true; refresh=true; } + if (strstr(query_no_space,"stats_mcp_query_rules")) + { stats_mcp_query_rules=true; refresh=true; } // temporary disabled because not implemented /* @@ -1434,6 +1447,9 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (strstr(query_no_space, "runtime_pgsql_query_rules_fast_routing")) { runtime_pgsql_query_rules_fast_routing = true; refresh = true; } + if (strstr(query_no_space, "runtime_mcp_query_rules")) { + runtime_mcp_query_rules = true; refresh = true; + } if (strstr(query_no_space,"runtime_scheduler")) { runtime_scheduler=true; refresh=true; } @@ -1584,6 +1600,16 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (stats_mcp_query_tools_counters_reset) { stats___mcp_query_tools_counters(true); } + if (stats_mcp_query_digest_reset) { + stats___mcp_query_digest(true); + } else { + if (stats_mcp_query_digest) { + stats___mcp_query_digest(false); + } + } + if (stats_mcp_query_rules) { + stats___mcp_query_rules(); + } if (admin) { if (dump_global_variables) { @@ -1658,6 +1684,9 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (runtime_pgsql_query_rules_fast_routing) { save_pgsql_query_rules_fast_routing_from_runtime(true); } + if (runtime_mcp_query_rules) { + save_mcp_query_rules_from_runtime(true); + } if (runtime_scheduler) { save_scheduler_runtime_to_database(true); } @@ -2622,6 +2651,7 @@ ProxySQL_Admin::ProxySQL_Admin() : generate_load_save_disk_commands("pgsql_users", "PGSQL USERS"); generate_load_save_disk_commands("pgsql_servers", "PGSQL SERVERS"); generate_load_save_disk_commands("pgsql_variables", "PGSQL VARIABLES"); + generate_load_save_disk_commands("mcp_query_rules", "MCP QUERY RULES"); generate_load_save_disk_commands("mcp_variables", "MCP VARIABLES"); generate_load_save_disk_commands("genai_variables", "GENAI VARIABLES"); generate_load_save_disk_commands("scheduler", "SCHEDULER"); @@ -7717,6 +7747,158 @@ char* ProxySQL_Admin::load_pgsql_firewall_to_runtime() { return NULL; } +// Load MCP query rules from memory (main database) to runtime +// +// This command loads MCP query rules from the admin database (main.mcp_query_rules) +// into the Discovery Schema's in-memory rule cache. After loading, rules become +// active for query processing. +// +// The command follows the ProxySQL pattern: +// 1. Read rules from main.mcp_query_rules table +// 2. Load into Discovery Schema's in-memory cache +// 3. Compile regex patterns for matching +// +// Returns: +// NULL on success, error message string on failure (caller must free) +// +char* ProxySQL_Admin::load_mcp_query_rules_to_runtime() { + unsigned long long curtime1 = monotonic_time(); + char* error = NULL; + int cols = 0; + int affected_rows = 0; + bool success = false; + + if (!GloMCPH) return (char*)"MCP Handler not started: command impossible to run"; + Query_Tool_Handler* qth = GloMCPH->query_tool_handler; + if (!qth) return (char*)"Query Tool Handler not initialized"; + + // Get the discovery schema catalog + Discovery_Schema* catalog = qth->get_catalog(); + if (!catalog) return (char*)"Discovery Schema catalog not initialized"; + + char* query = (char*)"SELECT rule_id, active, username, schemaname, tool_name, match_pattern, negate_match_pattern, re_modifiers, flagIN, flagOUT, replace_pattern, timeout_ms, error_msg, OK_msg, log, apply, comment FROM main.mcp_query_rules ORDER BY rule_id"; + SQLite3_result* resultset = NULL; + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + success = true; + catalog->load_mcp_query_rules(resultset); + } + + if (success == false) { + if (resultset) { + free(resultset); + } + } + + unsigned long long curtime2 = monotonic_time(); + curtime1 = curtime1 / 1000; + curtime2 = curtime2 / 1000; + if (curtime2 - curtime1 > 1000) { + proxy_info("Locked for %llums\n", curtime2 - curtime1); + } + + return NULL; +} + +// Save MCP query rules from runtime to database +// +// Saves the current in-memory MCP query rules to a database table. +// This is used to persist rules that have been loaded and are active in runtime. +// +// Args: +// _runtime: If true, save to runtime_mcp_query_rules (same schema, no hits) +// If false, save to mcp_query_rules (no hits) +// Note: The hits counter is in-memory only and is NOT persisted. +// +// The function copies all rules from the Discovery Schema's in-memory cache +// to the specified admin database table. This is typically called after: +// - Querying runtime_mcp_query_rules (to refresh the view with current data) +// - Manual runtime-to-memory save operation +// +void ProxySQL_Admin::save_mcp_query_rules_from_runtime(bool _runtime) { + if (!GloMCPH) return; + Query_Tool_Handler* qth = GloMCPH->query_tool_handler; + if (!qth) return; + Discovery_Schema* catalog = qth->get_catalog(); + if (!catalog) return; + + if (_runtime) { + admindb->execute("DELETE FROM runtime_mcp_query_rules"); + } else { + admindb->execute("DELETE FROM mcp_query_rules"); + } + + // Get current rules from Discovery_Schema (same 17 columns for both tables) + SQLite3_result* resultset = catalog->get_mcp_query_rules(); + if (resultset) { + char *a = NULL; + if (_runtime) { + a = (char *)"INSERT INTO runtime_mcp_query_rules (rule_id, active, username, schemaname, tool_name, match_pattern, negate_match_pattern, re_modifiers, flagIN, flagOUT, replace_pattern, timeout_ms, error_msg, OK_msg, log, apply, comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + } else { + a = (char *)"INSERT INTO mcp_query_rules (rule_id, active, username, schemaname, tool_name, match_pattern, negate_match_pattern, re_modifiers, flagIN, flagOUT, replace_pattern, timeout_ms, error_msg, OK_msg, log, apply, comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + } + int num_fields = 17; // same for both tables + + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + + // Build query with escaped values + int arg_len = 0; + char* buffs[17]; + for (int i = 0; i < num_fields; i++) { + if (r->fields[i]) { + char* o = escape_string_single_quotes(r->fields[i], false); + int l = strlen(o) + 4; + arg_len += l; + buffs[i] = (char*)malloc(l); + sprintf(buffs[i], "'%s'", o); + if (o != r->fields[i]) { // there was a copy + free(o); + } + } else { + int l = 5; + arg_len += l; + buffs[i] = (char*)malloc(l); + sprintf(buffs[i], "NULL"); + } + } + + char* query = (char*)malloc(strlen(a) + arg_len + 32); + + sprintf(query, a, + buffs[0], // rule_id + buffs[1], // active + buffs[2], // username + buffs[3], // schemaname + buffs[4], // tool_name + buffs[5], // match_pattern + buffs[6], // negate_match_pattern + buffs[7], // re_modifiers + buffs[8], // flagIN + buffs[9], // flagOUT + buffs[10], // replace_pattern + buffs[11], // timeout_ms + buffs[12], // error_msg + buffs[13], // OK_msg + buffs[14], // log + buffs[15], // apply + buffs[16] // comment + ); + + admindb->execute(query); + + for (int i = 0; i < num_fields; i++) { + free(buffs[i]); + } + free(query); + } + delete resultset; + } +} + char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_query_rules_resultset, SQLite3_result* SQLite3_query_rules_fast_routing_resultset, const std::string& checksum, const time_t epoch) { // About the queries used here, see notes about CLUSTER_QUERY_MYSQL_QUERY_RULES and // CLUSTER_QUERY_MYSQL_QUERY_RULES_FAST_ROUTING in ProxySQL_Cluster.hpp diff --git a/lib/ProxySQL_Admin_Stats.cpp b/lib/ProxySQL_Admin_Stats.cpp index 3a1c433ca8..b0ca536a26 100644 --- a/lib/ProxySQL_Admin_Stats.cpp +++ b/lib/ProxySQL_Admin_Stats.cpp @@ -2544,3 +2544,133 @@ int ProxySQL_Admin::stats___save_pgsql_query_digest_to_sqlite( return row_idx; } + +// ============================================================ +// MCP QUERY DIGEST STATS +// ============================================================ + +// Collect MCP query digest statistics and populate stats tables. +// +// Populates the stats_mcp_query_digest or stats_mcp_query_digest_reset +// table with current digest statistics from all MCP queries processed. +// This is called automatically when the stats_mcp_query_digest table is queried. +// +// The function: +// 1. Deletes all existing rows from stats_mcp_query_digest (or stats_mcp_query_digest_reset) +// 2. Reads digest statistics from Discovery Schema's in-memory digest map +// 3. Inserts fresh data into the stats table +// +// Parameters: +// reset - If true, populates stats_mcp_query_digest_reset and clears in-memory stats. +// If false, populates stats_mcp_query_digest (non-reset view). +// +// Note: This is currently a simplified implementation. The digest statistics +// are stored in memory in the Discovery_Schema and accessed via get_mcp_query_digest(). +// +// Stats columns returned: +// - tool_name: Name of the MCP tool that was called +// - run_id: Discovery run identifier +// - digest: 128-bit hash (lower 64 bits) identifying the query fingerprint +// - digest_text: Fingerprinted JSON with literals replaced by '?' +// - count_star: Number of times this digest was seen +// - first_seen: Unix timestamp of first occurrence +// - last_seen: Unix timestamp of most recent occurrence +// - sum_time: Total execution time in microseconds +// - min_time: Minimum execution time in microseconds +// - max_time: Maximum execution time in microseconds +void ProxySQL_Admin::stats___mcp_query_digest(bool reset) { + if (!GloMCPH) return; + Query_Tool_Handler* qth = GloMCPH->query_tool_handler; + if (!qth) return; + + // Get the discovery schema catalog + Discovery_Schema* catalog = qth->get_catalog(); + if (!catalog) return; + + // Get the stats from the catalog (includes reset logic) + SQLite3_result* resultset = catalog->get_mcp_query_digest(reset); + if (!resultset) return; + + statsdb->execute("BEGIN"); + + if (reset) { + statsdb->execute("DELETE FROM stats_mcp_query_digest_reset"); + } else { + statsdb->execute("DELETE FROM stats_mcp_query_digest"); + } + + // Insert digest statistics into the stats table + // Columns: tool_name, run_id, digest, digest_text, count_star, + // first_seen, last_seen, sum_time, min_time, max_time + char* a = (char*)"INSERT INTO stats_mcp_query_digest VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")"; + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + int arg_len = 0; + for (int i = 0; i < 10; i++) { + arg_len += strlen(r->fields[i]); + } + char* query = (char*)malloc(strlen(a) + arg_len + 32); + sprintf(query, a, + r->fields[0], // tool_name + r->fields[1], // run_id + r->fields[2], // digest + r->fields[3], // digest_text + r->fields[4], // count_star + r->fields[5], // first_seen + r->fields[6], // last_seen + r->fields[7], // sum_time + r->fields[8], // min_time + r->fields[9] // max_time + ); + statsdb->execute(query); + free(query); + } + statsdb->execute("COMMIT"); + delete resultset; +} + +// Collect MCP query rules statistics +// +// Populates the stats_mcp_query_rules table with current hit counters +// from all MCP query rules in memory. This is called automatically +// when the stats_mcp_query_rules table is queried. +// +// The function: +// 1. Deletes all existing rows from stats_mcp_query_rules +// 2. Reads rule_id and hits from Discovery Schema's in-memory rules +// 3. Inserts fresh data into stats_mcp_query_rules table +// +// Note: Unlike digest stats, query rules stats do not support reset-on-read. +// The stats table is simply refreshed with current hit counts. +// +void ProxySQL_Admin::stats___mcp_query_rules() { + if (!GloMCPH) return; + Query_Tool_Handler* qth = GloMCPH->query_tool_handler; + if (!qth) return; + + // Get the discovery schema catalog + Discovery_Schema* catalog = qth->get_catalog(); + if (!catalog) return; + + // Get the stats from the catalog + SQLite3_result* resultset = catalog->get_stats_mcp_query_rules(); + if (!resultset) return; + + statsdb->execute("BEGIN"); + statsdb->execute("DELETE FROM stats_mcp_query_rules"); + + char* a = (char*)"INSERT INTO stats_mcp_query_rules VALUES (\"%s\",\"%s\")"; + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + int arg_len = 0; + for (int i = 0; i < 2; i++) { + arg_len += strlen(r->fields[i]); + } + char* query = (char*)malloc(strlen(a) + arg_len + 32); + sprintf(query, a, r->fields[0], r->fields[1]); + statsdb->execute(query); + free(query); + } + statsdb->execute("COMMIT"); + delete resultset; +} diff --git a/lib/Query_Tool_Handler.cpp b/lib/Query_Tool_Handler.cpp index dafc5ea25d..4b26021f71 100644 --- a/lib/Query_Tool_Handler.cpp +++ b/lib/Query_Tool_Handler.cpp @@ -13,7 +13,28 @@ using json = nlohmann::json; // MySQL client library #include -// Helper to safely get string from JSON +// ============================================================ +// JSON Helper Functions +// +// These helper functions provide safe extraction of values from +// nlohmann::json objects with type coercion and default values. +// They handle edge cases like null values, type mismatches, and +// missing keys gracefully. +// ============================================================ + +// Safely extract a string value from JSON. +// +// Returns the value as a string if the key exists and is not null. +// For non-string types, returns the JSON dump representation. +// Returns the default value if the key is missing or null. +// +// Parameters: +// j - JSON object to extract from +// key - Key to look up +// default_val - Default value if key is missing or null +// +// Returns: +// String value, JSON dump, or default value static std::string json_string(const json& j, const std::string& key, const std::string& default_val = "") { if (j.contains(key) && !j[key].is_null()) { if (j[key].is_string()) { @@ -24,7 +45,21 @@ static std::string json_string(const json& j, const std::string& key, const std: return default_val; } -// Helper to safely get int from JSON - handles numbers, booleans, and numeric strings +// Safely extract an integer value from JSON with type coercion. +// +// Handles multiple input types: +// - Numbers: Returns directly as int +// - Booleans: Converts (true=1, false=0) +// - Strings: Attempts numeric parsing +// - Missing/null: Returns default value +// +// Parameters: +// j - JSON object to extract from +// key - Key to look up +// default_val - Default value if key is missing, null, or unparseable +// +// Returns: +// Integer value, or default value static int json_int(const json& j, const std::string& key, int default_val = 0) { if (j.contains(key) && !j[key].is_null()) { const json& val = j[key]; @@ -50,7 +85,20 @@ static int json_int(const json& j, const std::string& key, int default_val = 0) return default_val; } -// Helper to safely get double from JSON - handles both numbers and numeric strings +// Safely extract a double value from JSON with type coercion. +// +// Handles multiple input types: +// - Numbers: Returns directly as double +// - Strings: Attempts numeric parsing +// - Missing/null: Returns default value +// +// Parameters: +// j - JSON object to extract from +// key - Key to look up +// default_val - Default value if key is missing, null, or unparseable +// +// Returns: +// Double value, or default value static double json_double(const json& j, const std::string& key, double default_val = 0.0) { if (j.contains(key) && !j[key].is_null()) { const json& val = j[key]; @@ -1547,22 +1595,103 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& if (sql.empty()) { result = create_error_response("sql is required"); - } else if (!validate_readonly_query(sql)) { - result = create_error_response("SQL is not read-only"); - } else if (is_dangerous_query(sql)) { - result = create_error_response("SQL contains dangerous operations"); } else { - std::string query_result = execute_query_with_schema(sql, schema); - try { - json result_json = json::parse(query_result); - // Check if query actually failed - if (result_json.contains("success") && !result_json["success"]) { - result = create_error_response(result_json["error"]); - } else { - result = create_success_response(result_json); + // ============================================================ + // MCP QUERY RULES EVALUATION + // ============================================================ + MCP_Query_Processor_Output* qpo = catalog->evaluate_mcp_query_rules( + tool_name, + schema, + arguments, + sql + ); + + // Check for OK_msg (return success without executing) + if (qpo->OK_msg) { + unsigned long long duration = monotonic_time() - start_time; + track_tool_invocation(this, tool_name, schema, duration); + catalog->log_query_tool_call(tool_name, schema, 0, start_time, duration, "OK message from query rule"); + result = create_success_response(qpo->OK_msg); + delete qpo; + return result; + } + + // Check for error_msg (block the query) + if (qpo->error_msg) { + unsigned long long duration = monotonic_time() - start_time; + track_tool_invocation(this, tool_name, schema, duration); + catalog->log_query_tool_call(tool_name, schema, 0, start_time, duration, "Blocked by query rule"); + result = create_error_response(qpo->error_msg); + delete qpo; + return result; + } + + // Apply rewritten query if provided + if (qpo->new_query) { + sql = *qpo->new_query; + } + + // Apply timeout if provided + if (qpo->timeout_ms > 0) { + // Use ceiling division to ensure sub-second timeouts are at least 1 second + timeout_sec = (qpo->timeout_ms + 999) / 1000; + } + + // Apply log flag if set + if (qpo->log == 1) { + // TODO: Implement query logging if needed + } + + delete qpo; + + // Continue with validation and execution + if (!validate_readonly_query(sql)) { + result = create_error_response("SQL is not read-only"); + } else if (is_dangerous_query(sql)) { + result = create_error_response("SQL contains dangerous operations"); + } else { + std::string query_result = execute_query_with_schema(sql, schema); + try { + json result_json = json::parse(query_result); + // Check if query actually failed + if (result_json.contains("success") && !result_json["success"]) { + result = create_error_response(result_json["error"]); + } else { + // ============================================================ + // MCP QUERY DIGEST TRACKING (on success) + // ============================================================ + // Track successful MCP tool calls for statistics aggregation. + // This computes a digest hash (similar to MySQL query digest) that + // groups similar queries together by replacing literal values with + // placeholders. Statistics are accumulated per digest and can be + // queried via the stats_mcp_query_digest table. + // + // Process: + // 1. Compute digest hash using fingerprinted arguments + // 2. Store/aggregate statistics in the digest map (count, timing) + // 3. Stats are available via stats_mcp_query_digest table + // + // Statistics tracked: + // - count_star: Number of times this digest was executed + // - sum_time, min_time, max_time: Execution timing metrics + // - first_seen, last_seen: Timestamps for occurrence tracking + uint64_t digest = Discovery_Schema::compute_mcp_digest(tool_name, arguments); + std::string digest_text = Discovery_Schema::fingerprint_mcp_args(arguments); + unsigned long long duration = monotonic_time() - start_time; + int digest_run_id = schema.empty() ? 0 : catalog->resolve_run_id(schema); + catalog->update_mcp_query_digest( + tool_name, + digest_run_id, + digest, + digest_text, + duration, + time(NULL) + ); + result = create_success_response(result_json); + } + } catch (...) { + result = create_success_response(query_result); } - } catch (...) { - result = create_success_response(query_result); } } } diff --git a/lib/Static_Harvester.cpp b/lib/Static_Harvester.cpp index 868cd0d22d..d3481edb61 100644 --- a/lib/Static_Harvester.cpp +++ b/lib/Static_Harvester.cpp @@ -1,3 +1,21 @@ +// ============================================================ +// Static_Harvester Implementation +// +// Static metadata harvester for MySQL databases. This class performs +// deterministic metadata extraction from MySQL's INFORMATION_SCHEMA +// and stores it in a Discovery_Schema catalog for use by MCP tools. +// +// Harvest stages (executed in order by run_full_harvest): +// 1. Schemas/Databases - From information_schema.SCHEMATA +// 2. Objects - Tables, views, routines from TABLES and ROUTINES +// 3. Columns - From COLUMNS with derived hints (is_time, is_id_like) +// 4. Indexes - From STATISTICS with is_pk, is_unique, is_indexed flags +// 5. Foreign Keys - From KEY_COLUMN_USAGE and REFERENTIAL_CONSTRAINTS +// 6. View Definitions - From VIEWS +// 7. Quick Profiles - Metadata-based table kind inference (log/event, fact, entity) +// 8. FTS Index Rebuild - Full-text search index for object discovery +// ============================================================ + #include "Static_Harvester.h" #include "proxysql_debug.h" #include @@ -12,6 +30,25 @@ #include "../deps/json/json.hpp" using json = nlohmann::json; +// ============================================================ +// Constructor / Destructor +// ============================================================ + +// Initialize Static_Harvester with MySQL connection parameters. +// +// Parameters: +// host - MySQL server hostname or IP address +// port - MySQL server port number +// user - MySQL username for authentication +// password - MySQL password for authentication +// schema - Default schema (can be empty for all schemas) +// catalog_path - Filesystem path to the SQLite catalog database +// +// Notes: +// - Creates a new Discovery_Schema instance for catalog storage +// - Initializes the connection mutex but does NOT connect to MySQL yet +// - Call init() after construction to initialize the catalog +// - MySQL connection is established lazily on first harvest operation Static_Harvester::Static_Harvester( const std::string& host, int port, @@ -33,6 +70,10 @@ Static_Harvester::Static_Harvester( catalog = new Discovery_Schema(catalog_path); } +// Destroy Static_Harvester and release resources. +// +// Ensures MySQL connection is closed and the Discovery_Schema catalog +// is properly deleted. Connection mutex is destroyed. Static_Harvester::~Static_Harvester() { close(); if (catalog) { @@ -41,6 +82,18 @@ Static_Harvester::~Static_Harvester() { pthread_mutex_destroy(&conn_lock); } +// ============================================================ +// Lifecycle Methods +// ============================================================ + +// Initialize the harvester by initializing the catalog database. +// +// This must be called after construction before any harvest operations. +// Initializes the Discovery_Schema SQLite database, creating tables +// if they don't exist. +// +// Returns: +// 0 on success, -1 on error int Static_Harvester::init() { if (catalog->init()) { proxy_error("Static_Harvester: Failed to initialize catalog\n"); @@ -49,10 +102,36 @@ int Static_Harvester::init() { return 0; } +// Close the MySQL connection and cleanup resources. +// +// Disconnects from MySQL if connected. The catalog is NOT destroyed, +// allowing multiple harvest runs with the same harvester instance. void Static_Harvester::close() { disconnect_mysql(); } +// ============================================================ +// MySQL Connection Methods +// ============================================================ + +// Establish connection to the MySQL server. +// +// Connects to MySQL using the credentials provided during construction. +// If already connected, returns 0 immediately (idempotent). +// +// Connection settings: +// - 30 second connect/read/write timeouts +// - CLIENT_MULTI_STATEMENTS flag enabled +// - No default database selected (we query information_schema) +// +// On successful connection, also retrieves the MySQL server version +// and builds the source DSN string for run tracking. +// +// Thread Safety: +// Uses mutex to ensure thread-safe connection establishment. +// +// Returns: +// 0 on success (including already connected), -1 on error int Static_Harvester::connect_mysql() { pthread_mutex_lock(&conn_lock); @@ -103,6 +182,13 @@ int Static_Harvester::connect_mysql() { return 0; } +// Disconnect from the MySQL server. +// +// Closes the MySQL connection if connected. Safe to call when +// not connected (idempotent). +// +// Thread Safety: +// Uses mutex to ensure thread-safe disconnection. void Static_Harvester::disconnect_mysql() { pthread_mutex_lock(&conn_lock); if (mysql_conn) { @@ -112,6 +198,13 @@ void Static_Harvester::disconnect_mysql() { pthread_mutex_unlock(&conn_lock); } +// Get the MySQL server version string. +// +// Retrieves the version from the connected MySQL server. +// Used for recording metadata in the discovery run. +// +// Returns: +// MySQL version string (e.g., "8.0.35"), or empty string if not connected std::string Static_Harvester::get_mysql_version() { if (!mysql_conn) { return ""; @@ -126,6 +219,20 @@ std::string Static_Harvester::get_mysql_version() { return mysql_get_server_info(mysql_conn); } +// Execute a SQL query on the MySQL server and return results. +// +// Executes the query and returns all result rows as a vector of string vectors. +// NULL values are converted to empty strings. +// +// Parameters: +// query - SQL query string to execute +// results - Output parameter populated with result rows +// +// Returns: +// 0 on success (including queries with no result set), -1 on error +// +// Thread Safety: +// Uses mutex to ensure thread-safe query execution. int Static_Harvester::execute_query(const std::string& query, std::vector>& results) { pthread_mutex_lock(&conn_lock); @@ -166,6 +273,19 @@ int Static_Harvester::execute_query(const std::string& query, std::vector= 0) { proxy_error("Static_Harvester: Run already active (run_id=%d)\n", current_run_id); @@ -205,6 +354,16 @@ int Static_Harvester::start_run(const std::string& notes) { return current_run_id; } +// Finish the current discovery run. +// +// Marks the run as completed in the catalog with a finish timestamp +// and optional completion notes. Resets current_run_id to -1. +// +// Parameters: +// notes - Optional completion notes (e.g., "Completed successfully", "Failed at stage X") +// +// Returns: +// 0 on success, -1 on error (including if no run is active) int Static_Harvester::finish_run(const std::string& notes) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -222,6 +381,20 @@ int Static_Harvester::finish_run(const std::string& notes) { return 0; } +// ============================================================ +// Fetch Methods (Query INFORMATION_SCHEMA) +// ============================================================ + +// Fetch schema/database metadata from information_schema.SCHEMATA. +// +// Queries MySQL for all schemas (databases) and their character set +// and collation information. +// +// Parameters: +// filter - Optional schema name filter (empty for all schemas) +// +// Returns: +// Vector of SchemaRow structures containing schema metadata std::vector Static_Harvester::fetch_schemas(const std::string& filter) { std::vector schemas; @@ -249,6 +422,25 @@ std::vector Static_Harvester::fetch_schemas(const s return schemas; } +// ============================================================ +// Harvest Stage Methods +// ============================================================ + +// Harvest schemas/databases to the catalog. +// +// Fetches schemas from information_schema.SCHEMATA and inserts them +// into the catalog. System schemas (mysql, information_schema, +// performance_schema, sys) are skipped. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of schemas harvested, or -1 on error +// +// Notes: +// - Requires an active run (start_run must be called first) +// - Skips system schemas automatically int Static_Harvester::harvest_schemas(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -274,6 +466,16 @@ int Static_Harvester::harvest_schemas(const std::string& only_schema) { return count; } +// Fetch table and view metadata from information_schema.TABLES. +// +// Queries MySQL for all tables and views with their physical +// characteristics (rows, size, engine, timestamps). +// +// Parameters: +// filter - Optional schema name filter +// +// Returns: +// Vector of ObjectRow structures containing table/view metadata std::vector Static_Harvester::fetch_tables_views(const std::string& filter) { std::vector objects; @@ -310,6 +512,16 @@ std::vector Static_Harvester::fetch_tables_views(co return objects; } +// Fetch column metadata from information_schema.COLUMNS. +// +// Queries MySQL for all columns with their data types, nullability, +// defaults, character set, and comments. +// +// Parameters: +// filter - Optional schema name filter +// +// Returns: +// Vector of ColumnRow structures containing column metadata std::vector Static_Harvester::fetch_columns(const std::string& filter) { std::vector columns; @@ -349,6 +561,16 @@ std::vector Static_Harvester::fetch_columns(const s return columns; } +// Fetch index metadata from information_schema.STATISTICS. +// +// Queries MySQL for all indexes with their columns, sequence, +// uniqueness, cardinality, and collation. +// +// Parameters: +// filter - Optional schema name filter +// +// Returns: +// Vector of IndexRow structures containing index metadata std::vector Static_Harvester::fetch_indexes(const std::string& filter) { std::vector indexes; @@ -385,6 +607,17 @@ std::vector Static_Harvester::fetch_indexes(const st return indexes; } +// Fetch foreign key metadata from information_schema. +// +// Queries KEY_COLUMN_USAGE and REFERENTIAL_CONSTRAINTS to get +// foreign key relationships including child/parent tables and columns, +// and ON UPDATE/DELETE rules. +// +// Parameters: +// filter - Optional schema name filter +// +// Returns: +// Vector of FKRow structures containing foreign key metadata std::vector Static_Harvester::fetch_foreign_keys(const std::string& filter) { std::vector fks; @@ -428,6 +661,16 @@ std::vector Static_Harvester::fetch_foreign_keys(const return fks; } +// Harvest objects (tables, views, routines) to the catalog. +// +// Fetches tables/views from information_schema.TABLES and routines +// from information_schema.ROUTINES, inserting them all into the catalog. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of objects harvested, or -1 on error int Static_Harvester::harvest_objects(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -479,6 +722,20 @@ int Static_Harvester::harvest_objects(const std::string& only_schema) { return count; } +// Harvest columns to the catalog with derived hints. +// +// Fetches columns from information_schema.COLUMNS and computes +// derived flags: is_time (temporal types) and is_id_like (ID-like names). +// Updates object flags after all columns are inserted. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of columns harvested, or -1 on error +// +// Notes: +// - Updates object flags (has_time_column) after harvest int Static_Harvester::harvest_columns(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -535,6 +792,22 @@ int Static_Harvester::harvest_columns(const std::string& only_schema) { return count; } +// Harvest indexes to the catalog and update column flags. +// +// Fetches indexes from information_schema.STATISTICS and inserts +// them with their columns. Updates column flags (is_pk, is_unique, +// is_indexed) and object flags (has_primary_key) after harvest. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of indexes harvested, or -1 on error +// +// Notes: +// - Groups index columns by index name +// - Marks PRIMARY KEY indexes with is_primary=1 +// - Updates column and object flags after harvest int Static_Harvester::harvest_indexes(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -642,6 +915,21 @@ int Static_Harvester::harvest_indexes(const std::string& only_schema) { return count; } +// Harvest foreign keys to the catalog. +// +// Fetches foreign keys from information_schema and inserts them +// with their child/parent column mappings. Updates object flags +// (has_foreign_keys) after harvest. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of foreign keys harvested, or -1 on error +// +// Notes: +// - Groups FK columns by constraint name +// - Updates object flags after harvest int Static_Harvester::harvest_foreign_keys(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -712,6 +1000,16 @@ int Static_Harvester::harvest_foreign_keys(const std::string& only_schema) { return count; } +// Harvest view definitions to the catalog. +// +// Fetches VIEW_DEFINITION from information_schema.VIEWS and stores +// it in the object's definition_sql field. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// +// Returns: +// Number of views updated, or -1 on error int Static_Harvester::harvest_view_definitions(const std::string& only_schema) { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -760,6 +1058,23 @@ int Static_Harvester::harvest_view_definitions(const std::string& only_schema) { return count; } +// Build quick profiles (metadata-only table analysis). +// +// Analyzes table metadata to derive: +// - guessed_kind: log/event, fact, entity, or unknown (based on table name) +// - rows_est, size_bytes, engine: from object metadata +// - has_primary_key, has_foreign_keys, has_time_column: boolean flags +// +// Stores the profile as JSON with profile_kind='table_quick'. +// +// Returns: +// Number of profiles built, or -1 on error +// +// Table Kind Heuristics: +// - log/event: name contains "log", "event", or "audit" +// - fact: name contains "order", "invoice", "payment", or "transaction" +// - entity: name contains "user", "customer", "account", or "product" +// - unknown: none of the above patterns match int Static_Harvester::build_quick_profiles() { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -832,6 +1147,13 @@ int Static_Harvester::build_quick_profiles() { return count; } +// Rebuild the full-text search index for the current run. +// +// Deletes and rebuilds the fts_objects FTS5 index, enabling fast +// full-text search across object names, schemas, and comments. +// +// Returns: +// 0 on success, -1 on error int Static_Harvester::rebuild_fts_index() { if (current_run_id < 0) { proxy_error("Static_Harvester: No active run\n"); @@ -848,6 +1170,28 @@ int Static_Harvester::rebuild_fts_index() { return 0; } +// Run a complete harvest of all metadata stages. +// +// Executes all harvest stages in order: +// 1. Start discovery run +// 2. Harvest schemas/databases +// 3. Harvest objects (tables, views, routines) +// 4. Harvest columns with derived hints +// 5. Harvest indexes and update column flags +// 6. Harvest foreign keys +// 7. Harvest view definitions +// 8. Build quick profiles +// 9. Rebuild FTS index +// 10. Finish run +// +// If any stage fails, the run is finished with an error note. +// +// Parameters: +// only_schema - Optional filter to harvest only one schema +// notes - Optional notes for the run +// +// Returns: +// run_id on success, -1 on error int Static_Harvester::run_full_harvest(const std::string& only_schema, const std::string& notes) { if (start_run(notes) < 0) { return -1; @@ -898,6 +1242,18 @@ int Static_Harvester::run_full_harvest(const std::string& only_schema, const std return final_run_id; } +// ============================================================ +// Statistics Methods +// ============================================================ + +// Get harvest statistics for the current run. +// +// Returns statistics including counts of objects (by type), +// columns, indexes, and foreign keys harvested in the +// currently active run. +// +// Returns: +// JSON string with harvest statistics, or error if no active run std::string Static_Harvester::get_harvest_stats() { if (current_run_id < 0) { return "{\"error\": \"No active run\"}"; @@ -905,6 +1261,16 @@ std::string Static_Harvester::get_harvest_stats() { return get_harvest_stats(current_run_id); } +// Get harvest statistics for a specific run. +// +// Queries the catalog for counts of objects (by type), columns, +// indexes, and foreign keys for the specified run_id. +// +// Parameters: +// run_id - The run ID to get statistics for +// +// Returns: +// JSON string with structure: {"run_id": N, "objects": {...}, "columns": N, "indexes": N, "foreign_keys": N} std::string Static_Harvester::get_harvest_stats(int run_id) { char* error = NULL; int cols = 0, affected = 0; diff --git a/test/tap/tests/Makefile b/test/tap/tests/Makefile index 4434c23762..c5f81b4187 100644 --- a/test/tap/tests/Makefile +++ b/test/tap/tests/Makefile @@ -168,6 +168,9 @@ sh-%: cp $(patsubst sh-%,%,$@) $(patsubst sh-%.sh,%,$@) chmod +x $(patsubst sh-%.sh,%,$@) +anomaly_detection-t: anomaly_detection-t.cpp $(TAP_LDIR)/libtap.so + $(CXX) -DEXCLUDE_TRACKING_VARAIABLES $< ../tap/SQLite3_Server.cpp -I$(CLICKHOUSE_CPP_IDIR) $(IDIRS) $(LDIRS) -L$(CLICKHOUSE_CPP_LDIR) -L$(LZ4_LDIR) $(OPT) $(OBJ) $(MYLIBSJEMALLOC) $(MYLIBS) $(STATIC_LIBS) $(CLICKHOUSE_CPP_LDIR)/libclickhouse-cpp-lib.a $(CLICKHOUSE_CPP_PATH)/contrib/zstd/zstd/libzstdstatic.a $(LZ4_LDIR)/liblz4.a $(SQLITE3_LDIR)/../libsqlite_rembed.a -lscram -lusual -Wl,--allow-multiple-definition -o $@ + %-t: %-t.cpp $(TAP_LDIR)/libtap.so $(CXX) $< $(IDIRS) $(LDIRS) $(OPT) $(MYLIBS) $(STATIC_LIBS) -o $@ diff --git a/test/tap/tests/ai_llm_retry_scenarios-t.cpp b/test/tap/tests/ai_llm_retry_scenarios-t.cpp index 175e74668b..211586e194 100644 --- a/test/tap/tests/ai_llm_retry_scenarios-t.cpp +++ b/test/tap/tests/ai_llm_retry_scenarios-t.cpp @@ -14,6 +14,7 @@ */ #include "tap.h" +#include #include #include #include diff --git a/test/tap/tests/anomaly_detection-t.cpp b/test/tap/tests/anomaly_detection-t.cpp index 28092a8ce9..bd73ae896a 100644 --- a/test/tap/tests/anomaly_detection-t.cpp +++ b/test/tap/tests/anomaly_detection-t.cpp @@ -50,6 +50,17 @@ MYSQL* g_admin = NULL; class AI_Features_Manager; extern AI_Features_Manager *GloAI; +// Forward declarations +class MySQL_Session; +typedef struct _PtrSize_t PtrSize_t; + +// Stub for SQLite3_Server_session_handler - required by SQLite3_Server.cpp +// This test uses admin MySQL connection, so this is just a placeholder +void SQLite3_Server_session_handler(MySQL_Session* sess, void* _pa, PtrSize_t* pkt) { + // This is a stub - the actual test uses MySQL admin connection + // The SQLite3_Server.cpp sets this as a handler but we don't use it +} + // ============================================================================ // Helper Functions // ============================================================================ diff --git a/test/tap/tests/vector_db_performance-t.cpp b/test/tap/tests/vector_db_performance-t.cpp index d5e5678dcf..10a80a2ab5 100644 --- a/test/tap/tests/vector_db_performance-t.cpp +++ b/test/tap/tests/vector_db_performance-t.cpp @@ -14,9 +14,11 @@ */ #include "tap.h" +#include #include #include #include +#include #include #include #include @@ -320,7 +322,7 @@ void test_large_dataset_handling() { auto insert_duration = std::chrono::duration_cast(end_insert - start_insert); ok(db.size() == large_size, "Large dataset (%zu entries) inserted successfully", large_size); - diag("Time to insert %zu entries: %lld ms", large_size, insert_duration.count()); + diag("Time to insert %zu entries: %ld ms", large_size, insert_duration.count()); // Test search performance in large dataset auto search_result = db.lookup_entry("Large dataset query 5000"); @@ -376,7 +378,7 @@ void test_concurrent_access() { long long avg_time = total_time / num_operations; diag("Average time per concurrent operation: %lld microseconds", avg_time); - diag("Total time for %d operations: %lld microseconds", num_operations, total_duration.count()); + diag("Total time for %d operations: %ld microseconds", num_operations, total_duration.count()); // Operations should be reasonably fast ok(avg_time < 50000, "Average concurrent operation time reasonable (< 50ms)");