Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 96 additions & 129 deletions src-tauri/crates/mas-core/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ impl QueryExecutor {
// SELECT/SHOW/DESCRIBE/EXPLAIN row — accumulate until trailing Left.
if stmt_idx >= 0 {
// Check memory every 1000 rows to prevent OOM
if current_rows.len().is_multiple_of(1000) && mem_guard.check().is_err() {
mem_guard.set_triggered();
if !current_rows.is_empty()
&& current_rows.len().is_multiple_of(1000)
&& mem_guard.check().is_err()
{
tracing::warn!(
rows_accumulated = current_rows.len(),
"Memory limit reached, stopping query fetch"
Expand Down Expand Up @@ -137,36 +139,10 @@ impl QueryExecutor {
|| upper.starts_with("EXPLAIN");

if is_select {
let columns: Vec<ColumnMeta> =
if let Some(first_row) = current_rows.first() {
first_row
.columns()
.iter()
.map(|col| ColumnMeta {
name: col.name().to_string(),
data_type: col.type_info().name().to_string(),
nullable: true,
is_primary_key: false,
})
.collect()
} else {
Vec::new()
};

let result_rows: Vec<Vec<SqlValue>> = current_rows
.iter()
.map(|row| {
row.columns()
.iter()
.enumerate()
.map(|(i, col)| {
extract_value(row, i, col.type_info().name())
})
.collect()
})
.collect();

let row_count = result_rows.len() as u64;
let row_count = current_rows.len() as u64;
let rows_truncated =
limit.is_some() && row_count >= limit.unwrap()
|| mem_guard.triggered();

if execution_time > 1000 {
tracing::warn!(
Expand All @@ -183,26 +159,13 @@ impl QueryExecutor {
"Query executed"
);

// Detect truncation: user limit enforced, or memory guard kicked in
let rows_truncated =
limit.is_some() && is_select && row_count >= limit.unwrap()
|| mem_guard.triggered();

results.push(QueryResult {
results.push(build_select_result(
query_id,
statement_index: idx,
columns,
rows: result_rows,
rows_affected: row_count,
execution_time_ms: execution_time,
warnings: vec![],
idx,
&current_rows,
execution_time,
rows_truncated,
total_rows_available: if rows_truncated {
Some(row_count)
} else {
None
},
});
));
} else {
let rows_affected = qr.rows_affected();

Expand Down Expand Up @@ -246,7 +209,6 @@ impl QueryExecutor {
if mem_guard.triggered() && stmt_idx >= 0 && !current_rows.is_empty() {
let idx = stmt_idx as usize;
let stmt = &statements[idx];
let query_id = uuid::Uuid::new_v4().to_string();
let execution_time = start.elapsed().as_millis() as u64;

let upper = stmt.to_uppercase();
Expand All @@ -256,45 +218,13 @@ impl QueryExecutor {
|| upper.starts_with("EXPLAIN");

if is_select {
let columns: Vec<ColumnMeta> = if let Some(first_row) = current_rows.first() {
first_row
.columns()
.iter()
.map(|col| ColumnMeta {
name: col.name().to_string(),
data_type: col.type_info().name().to_string(),
nullable: true,
is_primary_key: false,
})
.collect()
} else {
Vec::new()
};

let result_rows: Vec<Vec<SqlValue>> = current_rows
.iter()
.map(|row| {
row.columns()
.iter()
.enumerate()
.map(|(i, col)| extract_value(row, i, col.type_info().name()))
.collect()
})
.collect();

let row_count = result_rows.len() as u64;

results.push(QueryResult {
query_id,
statement_index: idx,
columns,
rows: result_rows,
rows_affected: row_count,
execution_time_ms: execution_time,
warnings: vec!["Query truncated: memory limit reached".to_string()],
rows_truncated: true,
total_rows_available: Some(row_count),
});
results.push(build_select_result(
uuid::Uuid::new_v4().to_string(),
idx,
&current_rows,
execution_time,
true,
));
}
}

Expand Down Expand Up @@ -539,61 +469,102 @@ fn find_keyword_offset(s: &str, keyword: &str) -> Option<usize> {
last_pos
}

/// Monitors process memory usage to prevent OOM crashes.
/// Checks every N rows during query execution and triggers when
/// process RSS exceeds a threshold of total system memory.
fn build_select_result(
query_id: String,
statement_index: usize,
rows: &[sqlx::mysql::MySqlRow],
execution_time_ms: u64,
rows_truncated: bool,
) -> QueryResult {
let columns: Vec<ColumnMeta> = rows
.first()
.map(|r| {
r.columns()
.iter()
.map(|col| ColumnMeta {
name: col.name().to_string(),
data_type: col.type_info().name().to_string(),
nullable: true,
is_primary_key: false,
})
.collect()
})
.unwrap_or_default();

let result_rows: Vec<Vec<SqlValue>> = rows
.iter()
.map(|row| {
row.columns()
.iter()
.enumerate()
.map(|(i, col)| extract_value(row, i, col.type_info().name()))
.collect()
})
.collect();

let row_count = result_rows.len() as u64;

QueryResult {
query_id,
statement_index,
columns,
rows: result_rows,
rows_affected: row_count,
execution_time_ms,
warnings: vec![],
rows_truncated,
total_rows_available: if rows_truncated { Some(row_count) } else { None },
}
}

/// Monitors system-wide available memory to prevent OOM crashes.
/// Checks every 1000 rows during query execution and triggers when
/// available system memory drops below 512 MB.
struct MemoryGuard {
threshold_bytes: u64,
sys: sysinfo::System,
triggered: bool,
}

impl MemoryGuard {
/// Create a new guard. Threshold defaults to 75% of system RAM.
/// Create a new guard. Reads initial memory state for diagnostics.
fn new() -> Self {
let mut sys = sysinfo::System::new();
sys.refresh_memory();

let total = sys.total_memory();
let threshold = (total as f64 * 0.75) as u64;
let available_mb = sys.available_memory() / 1024 / 1024;
let total_mb = sys.total_memory() / 1024 / 1024;

tracing::debug!(
total_mb = total / 1024 / 1024,
threshold_mb = threshold / 1024 / 1024,
"Memory guard initialized"
available_mb,
total_mb,
"Memory guard initialized, will stop query if available memory drops below 512 MB"
);

Self {
threshold_bytes: threshold,
triggered: false,
}
Self { sys, triggered: false }
}

/// Check current process memory against threshold.
/// Returns Err if memory usage exceeds threshold.
fn check(&self) -> Result<(), CoreError> {
/// Refresh system memory and return Err if available memory is below 512 MB.
/// Sets the triggered flag on the first failure so subsequent calls fast-fail.
fn check(&mut self) -> Result<(), CoreError> {
if self.triggered {
return Err(CoreError::OutOfMemory(
"Query stopped: process memory limit reached".to_string(),
"Query stopped: available memory critically low".to_string(),
));
}

let mut sys = sysinfo::System::new();
sys.refresh_memory();

// Find our process and check memory usage
let current_pid = std::process::id();
if let Some(process) = sys.process(sysinfo::Pid::from_u32(current_pid)) {
let rss = process.memory();
let rss_mb = rss / 1024 / 1024;
let threshold_mb = self.threshold_bytes / 1024 / 1024;

if rss > self.threshold_bytes {
tracing::warn!(rss_mb, threshold_mb, "Process memory exceeds threshold");
return Err(CoreError::OutOfMemory(format!(
"Process memory ({rss_mb} MB) exceeds limit ({threshold_mb} MB). \
Add a LIMIT clause to reduce result size."
)));
}
self.sys.refresh_memory();
let available_mb = self.sys.available_memory() / 1024 / 1024;

if available_mb < 512 {
self.triggered = true;
tracing::warn!(
available_mb,
"System memory critically low, stopping query"
);
return Err(CoreError::OutOfMemory(format!(
"System memory critically low ({available_mb} MB available). \
Add a LIMIT clause to reduce result size."
)));
}

Ok(())
Expand All @@ -602,10 +573,6 @@ impl MemoryGuard {
fn triggered(&self) -> bool {
self.triggered
}

fn set_triggered(&mut self) {
self.triggered = true;
}
}

#[cfg(test)]
Expand Down
Loading