diff --git a/src-tauri/crates/mas-core/src/query/executor.rs b/src-tauri/crates/mas-core/src/query/executor.rs index e3bed8e..2884d91 100644 --- a/src-tauri/crates/mas-core/src/query/executor.rs +++ b/src-tauri/crates/mas-core/src/query/executor.rs @@ -102,8 +102,10 @@ impl QueryExecutor { // SELECT/SHOW/DESCRIBE/EXPLAIN row — accumulate until trailing Left. if stmt_idx >= 0 { // Check memory every 1000 rows to prevent OOM - if current_rows.len().is_multiple_of(1000) && mem_guard.check().is_err() { - mem_guard.set_triggered(); + if !current_rows.is_empty() + && current_rows.len().is_multiple_of(1000) + && mem_guard.check().is_err() + { tracing::warn!( rows_accumulated = current_rows.len(), "Memory limit reached, stopping query fetch" @@ -137,36 +139,10 @@ impl QueryExecutor { || upper.starts_with("EXPLAIN"); if is_select { - let columns: Vec = - if let Some(first_row) = current_rows.first() { - first_row - .columns() - .iter() - .map(|col| ColumnMeta { - name: col.name().to_string(), - data_type: col.type_info().name().to_string(), - nullable: true, - is_primary_key: false, - }) - .collect() - } else { - Vec::new() - }; - - let result_rows: Vec> = current_rows - .iter() - .map(|row| { - row.columns() - .iter() - .enumerate() - .map(|(i, col)| { - extract_value(row, i, col.type_info().name()) - }) - .collect() - }) - .collect(); - - let row_count = result_rows.len() as u64; + let row_count = current_rows.len() as u64; + let rows_truncated = + limit.is_some() && row_count >= limit.unwrap() + || mem_guard.triggered(); if execution_time > 1000 { tracing::warn!( @@ -183,26 +159,13 @@ impl QueryExecutor { "Query executed" ); - // Detect truncation: user limit enforced, or memory guard kicked in - let rows_truncated = - limit.is_some() && is_select && row_count >= limit.unwrap() - || mem_guard.triggered(); - - results.push(QueryResult { + results.push(build_select_result( query_id, - statement_index: idx, - columns, - rows: result_rows, - rows_affected: row_count, - execution_time_ms: execution_time, - warnings: vec![], + idx, + ¤t_rows, + execution_time, rows_truncated, - total_rows_available: if rows_truncated { - Some(row_count) - } else { - None - }, - }); + )); } else { let rows_affected = qr.rows_affected(); @@ -246,7 +209,6 @@ impl QueryExecutor { if mem_guard.triggered() && stmt_idx >= 0 && !current_rows.is_empty() { let idx = stmt_idx as usize; let stmt = &statements[idx]; - let query_id = uuid::Uuid::new_v4().to_string(); let execution_time = start.elapsed().as_millis() as u64; let upper = stmt.to_uppercase(); @@ -256,45 +218,13 @@ impl QueryExecutor { || upper.starts_with("EXPLAIN"); if is_select { - let columns: Vec = if let Some(first_row) = current_rows.first() { - first_row - .columns() - .iter() - .map(|col| ColumnMeta { - name: col.name().to_string(), - data_type: col.type_info().name().to_string(), - nullable: true, - is_primary_key: false, - }) - .collect() - } else { - Vec::new() - }; - - let result_rows: Vec> = current_rows - .iter() - .map(|row| { - row.columns() - .iter() - .enumerate() - .map(|(i, col)| extract_value(row, i, col.type_info().name())) - .collect() - }) - .collect(); - - let row_count = result_rows.len() as u64; - - results.push(QueryResult { - query_id, - statement_index: idx, - columns, - rows: result_rows, - rows_affected: row_count, - execution_time_ms: execution_time, - warnings: vec!["Query truncated: memory limit reached".to_string()], - rows_truncated: true, - total_rows_available: Some(row_count), - }); + results.push(build_select_result( + uuid::Uuid::new_v4().to_string(), + idx, + ¤t_rows, + execution_time, + true, + )); } } @@ -539,61 +469,102 @@ fn find_keyword_offset(s: &str, keyword: &str) -> Option { last_pos } -/// Monitors process memory usage to prevent OOM crashes. -/// Checks every N rows during query execution and triggers when -/// process RSS exceeds a threshold of total system memory. +fn build_select_result( + query_id: String, + statement_index: usize, + rows: &[sqlx::mysql::MySqlRow], + execution_time_ms: u64, + rows_truncated: bool, +) -> QueryResult { + let columns: Vec = rows + .first() + .map(|r| { + r.columns() + .iter() + .map(|col| ColumnMeta { + name: col.name().to_string(), + data_type: col.type_info().name().to_string(), + nullable: true, + is_primary_key: false, + }) + .collect() + }) + .unwrap_or_default(); + + let result_rows: Vec> = rows + .iter() + .map(|row| { + row.columns() + .iter() + .enumerate() + .map(|(i, col)| extract_value(row, i, col.type_info().name())) + .collect() + }) + .collect(); + + let row_count = result_rows.len() as u64; + + QueryResult { + query_id, + statement_index, + columns, + rows: result_rows, + rows_affected: row_count, + execution_time_ms, + warnings: vec![], + rows_truncated, + total_rows_available: if rows_truncated { Some(row_count) } else { None }, + } +} + +/// Monitors system-wide available memory to prevent OOM crashes. +/// Checks every 1000 rows during query execution and triggers when +/// available system memory drops below 512 MB. struct MemoryGuard { - threshold_bytes: u64, + sys: sysinfo::System, triggered: bool, } impl MemoryGuard { - /// Create a new guard. Threshold defaults to 75% of system RAM. + /// Create a new guard. Reads initial memory state for diagnostics. fn new() -> Self { let mut sys = sysinfo::System::new(); sys.refresh_memory(); - let total = sys.total_memory(); - let threshold = (total as f64 * 0.75) as u64; + let available_mb = sys.available_memory() / 1024 / 1024; + let total_mb = sys.total_memory() / 1024 / 1024; tracing::debug!( - total_mb = total / 1024 / 1024, - threshold_mb = threshold / 1024 / 1024, - "Memory guard initialized" + available_mb, + total_mb, + "Memory guard initialized, will stop query if available memory drops below 512 MB" ); - Self { - threshold_bytes: threshold, - triggered: false, - } + Self { sys, triggered: false } } - /// Check current process memory against threshold. - /// Returns Err if memory usage exceeds threshold. - fn check(&self) -> Result<(), CoreError> { + /// Refresh system memory and return Err if available memory is below 512 MB. + /// Sets the triggered flag on the first failure so subsequent calls fast-fail. + fn check(&mut self) -> Result<(), CoreError> { if self.triggered { return Err(CoreError::OutOfMemory( - "Query stopped: process memory limit reached".to_string(), + "Query stopped: available memory critically low".to_string(), )); } - let mut sys = sysinfo::System::new(); - sys.refresh_memory(); - - // Find our process and check memory usage - let current_pid = std::process::id(); - if let Some(process) = sys.process(sysinfo::Pid::from_u32(current_pid)) { - let rss = process.memory(); - let rss_mb = rss / 1024 / 1024; - let threshold_mb = self.threshold_bytes / 1024 / 1024; - - if rss > self.threshold_bytes { - tracing::warn!(rss_mb, threshold_mb, "Process memory exceeds threshold"); - return Err(CoreError::OutOfMemory(format!( - "Process memory ({rss_mb} MB) exceeds limit ({threshold_mb} MB). \ - Add a LIMIT clause to reduce result size." - ))); - } + self.sys.refresh_memory(); + let available_mb = self.sys.available_memory() / 1024 / 1024; + + if available_mb < 512 { + self.triggered = true; + tracing::warn!( + available_mb, + "System memory critically low, stopping query" + ); + return Err(CoreError::OutOfMemory(format!( + "System memory critically low ({available_mb} MB available). \ + Add a LIMIT clause to reduce result size." + ))); } Ok(()) @@ -602,10 +573,6 @@ impl MemoryGuard { fn triggered(&self) -> bool { self.triggered } - - fn set_triggered(&mut self) { - self.triggered = true; - } } #[cfg(test)]