From 4275935e79a05adac977ce3997ade977e860d01a Mon Sep 17 00:00:00 2001 From: Amit Date: Wed, 15 Apr 2026 19:47:43 +0530 Subject: [PATCH 1/4] fix(conversation): serialize parallel writes to prevent race conditions --- Cargo.lock | 181 ++++++++++++++++-- .../src/conversation/conversation_repo.rs | 166 +++++++++------- crates/forge_repo/src/lib.rs | 4 +- crates/forge_services/Cargo.toml | 6 + .../benches/conversation_persistence.rs | 87 +++++++++ crates/forge_services/src/conversation.rs | 176 +++++++++++++++-- crates/forge_services/src/lib.rs | 1 + 7 files changed, 515 insertions(+), 106 deletions(-) create mode 100644 crates/forge_services/benches/conversation_persistence.rs diff --git a/Cargo.lock b/Cargo.lock index 04239d501f..314922e097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "1.0.0" @@ -104,7 +110,7 @@ dependencies = [ "objc2-foundation", "parking_lot", "percent-encoding", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "x11rb", ] @@ -225,7 +231,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -707,6 +713,12 @@ dependencies = [ "walkdir", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.60" @@ -779,6 +791,33 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "4.6.0" @@ -873,7 +912,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -1069,6 +1108,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -1821,7 +1898,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1932,7 +2009,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2423,6 +2500,7 @@ dependencies = [ "base64 0.22.1", "bytes", "chrono", + "criterion", "dashmap 7.0.0-rc2", "derive_more", "derive_setters", @@ -2431,6 +2509,7 @@ dependencies = [ "forge_config", "forge_domain", "forge_fs", + "forge_repo", "forge_snaps", "forge_stream", "forge_test_kit", @@ -4012,6 +4091,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -4323,7 +4408,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -4341,7 +4426,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.61.2", + "windows-core 0.62.2", ] [[package]] @@ -4642,6 +4727,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "is-wsl" version = "0.4.0" @@ -4664,6 +4760,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -4700,7 +4805,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5520,6 +5625,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "open" version = "5.3.3" @@ -5755,6 +5866,34 @@ dependencies = [ "time", ] +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "png" version = "0.18.1" @@ -6015,7 +6154,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.38", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -6053,9 +6192,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6546,7 +6685,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6559,7 +6698,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6630,7 +6769,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7536,7 +7675,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7727,6 +7866,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -8652,7 +8801,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/crates/forge_repo/src/conversation/conversation_repo.rs b/crates/forge_repo/src/conversation/conversation_repo.rs index e5dfcee035..696a78ea0d 100644 --- a/crates/forge_repo/src/conversation/conversation_repo.rs +++ b/crates/forge_repo/src/conversation/conversation_repo.rs @@ -16,101 +16,129 @@ impl ConversationRepositoryImpl { pub fn new(pool: Arc, workspace_id: WorkspaceHash) -> Self { Self { pool, wid: workspace_id } } + + async fn run_blocking(&self, operation: F) -> anyhow::Result + where + F: FnOnce(Arc, WorkspaceHash) -> anyhow::Result + Send + 'static, + T: Send + 'static, + { + let pool = self.pool.clone(); + let wid = self.wid; + tokio::task::spawn_blocking(move || operation(pool, wid)) + .await + .map_err(|e| anyhow::anyhow!("Conversation repository task failed: {e}"))? + } } #[async_trait::async_trait] impl ConversationRepository for ConversationRepositoryImpl { async fn upsert_conversation(&self, conversation: Conversation) -> anyhow::Result<()> { - let mut connection = self.pool.get_connection()?; - - let wid = self.wid; - let record = ConversationRecord::new(conversation, wid); - diesel::insert_into(conversations::table) - .values(&record) - .on_conflict(conversations::conversation_id) - .do_update() - .set(( - conversations::title.eq(&record.title), - conversations::context.eq(&record.context), - conversations::updated_at.eq(record.updated_at), - conversations::metrics.eq(&record.metrics), - )) - .execute(&mut connection)?; - Ok(()) + self.run_blocking(move |pool, wid| { + let mut connection = pool.get_connection()?; + + let record = ConversationRecord::new(conversation, wid); + diesel::insert_into(conversations::table) + .values(&record) + .on_conflict(conversations::conversation_id) + .do_update() + .set(( + conversations::title.eq(&record.title), + conversations::context.eq(&record.context), + conversations::updated_at.eq(record.updated_at), + conversations::metrics.eq(&record.metrics), + )) + .execute(&mut connection)?; + Ok(()) + }) + .await } async fn get_conversation( &self, conversation_id: &ConversationId, ) -> anyhow::Result> { - let mut connection = self.pool.get_connection()?; - - let record: Option = conversations::table - .filter(conversations::conversation_id.eq(conversation_id.into_string())) - .first(&mut connection) - .optional()?; - - match record { - Some(record) => Ok(Some(Conversation::try_from(record)?)), - None => Ok(None), - } + let conversation_id = *conversation_id; + self.run_blocking(move |pool, _wid| { + let mut connection = pool.get_connection()?; + + let record: Option = conversations::table + .filter(conversations::conversation_id.eq(conversation_id.into_string())) + .first(&mut connection) + .optional()?; + + match record { + Some(record) => Ok(Some(Conversation::try_from(record)?)), + None => Ok(None), + } + }) + .await } async fn get_all_conversations( &self, limit: Option, ) -> anyhow::Result>> { - let mut connection = self.pool.get_connection()?; - - let workspace_id = self.wid.id() as i64; - let mut query = conversations::table - .filter(conversations::workspace_id.eq(&workspace_id)) - .filter(conversations::context.is_not_null()) - .order(conversations::updated_at.desc()) - .into_boxed(); - - if let Some(limit_value) = limit { - query = query.limit(limit_value as i64); - } + self.run_blocking(move |pool, wid| { + let mut connection = pool.get_connection()?; + + let workspace_id = wid.id() as i64; + let mut query = conversations::table + .filter(conversations::workspace_id.eq(&workspace_id)) + .filter(conversations::context.is_not_null()) + .order(conversations::updated_at.desc()) + .into_boxed(); + + if let Some(limit_value) = limit { + query = query.limit(limit_value as i64); + } - let records: Vec = query.load(&mut connection)?; + let records: Vec = query.load(&mut connection)?; - if records.is_empty() { - return Ok(None); - } + if records.is_empty() { + return Ok(None); + } - let conversations: Result, _> = - records.into_iter().map(Conversation::try_from).collect(); - Ok(Some(conversations?)) + let conversations: Result, _> = + records.into_iter().map(Conversation::try_from).collect(); + Ok(Some(conversations?)) + }) + .await } async fn get_last_conversation(&self) -> anyhow::Result> { - let mut connection = self.pool.get_connection()?; - let workspace_id = self.wid.id() as i64; - let record: Option = conversations::table - .filter(conversations::workspace_id.eq(&workspace_id)) - .filter(conversations::context.is_not_null()) - .order(conversations::updated_at.desc()) - .first(&mut connection) - .optional()?; - let conversation = match record { - Some(record) => Some(Conversation::try_from(record)?), - None => None, - }; - Ok(conversation) + self.run_blocking(move |pool, wid| { + let mut connection = pool.get_connection()?; + let workspace_id = wid.id() as i64; + let record: Option = conversations::table + .filter(conversations::workspace_id.eq(&workspace_id)) + .filter(conversations::context.is_not_null()) + .order(conversations::updated_at.desc()) + .first(&mut connection) + .optional()?; + let conversation = match record { + Some(record) => Some(Conversation::try_from(record)?), + None => None, + }; + Ok(conversation) + }) + .await } async fn delete_conversation(&self, conversation_id: &ConversationId) -> anyhow::Result<()> { - let mut connection = self.pool.get_connection()?; - let workspace_id = self.wid.id() as i64; - - // Security: Ensure users can only delete conversations within their workspace - diesel::delete(conversations::table) - .filter(conversations::workspace_id.eq(&workspace_id)) - .filter(conversations::conversation_id.eq(conversation_id.into_string())) - .execute(&mut connection)?; - - Ok(()) + let conversation_id = *conversation_id; + self.run_blocking(move |pool, wid| { + let mut connection = pool.get_connection()?; + let workspace_id = wid.id() as i64; + + // Security: Ensure users can only delete conversations within their workspace + diesel::delete(conversations::table) + .filter(conversations::workspace_id.eq(&workspace_id)) + .filter(conversations::conversation_id.eq(conversation_id.into_string())) + .execute(&mut connection)?; + + Ok(()) + }) + .await } } diff --git a/crates/forge_repo/src/lib.rs b/crates/forge_repo/src/lib.rs index d489072371..7739f27cb6 100644 --- a/crates/forge_repo/src/lib.rs +++ b/crates/forge_repo/src/lib.rs @@ -14,5 +14,7 @@ mod proto_generated { tonic::include_proto!("forge.v1"); } -// Only expose forge_repo container +// Expose conversation persistence building blocks for benchmarks and focused validation. +pub use conversation::ConversationRepositoryImpl; +pub use database::{DatabasePool, PoolConfig}; pub use forge_repo::*; diff --git a/crates/forge_services/Cargo.toml b/crates/forge_services/Cargo.toml index 6784647880..26e7fca574 100644 --- a/crates/forge_services/Cargo.toml +++ b/crates/forge_services/Cargo.toml @@ -60,5 +60,11 @@ pretty_assertions.workspace = true tempfile.workspace = true fake = { version = "5.1.0", features = ["derive"] } forge_test_kit.workspace = true +forge_repo.workspace = true +criterion = { version = "0.5.1", features = ["async_tokio"] } + +[[bench]] +name = "conversation_persistence" +harness = false diff --git a/crates/forge_services/benches/conversation_persistence.rs b/crates/forge_services/benches/conversation_persistence.rs new file mode 100644 index 0000000000..bfc691517d --- /dev/null +++ b/crates/forge_services/benches/conversation_persistence.rs @@ -0,0 +1,87 @@ +use std::sync::Arc; +use std::time::Duration; + +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use forge_app::ConversationService; +use forge_domain::{Conversation, ConversationId, WorkspaceHash}; +use forge_repo::{ConversationRepositoryImpl, DatabasePool, PoolConfig}; +use forge_services::ForgeConversationService; +use tempfile::TempDir; +use tokio::runtime::Runtime; +use tokio::task::JoinSet; + +struct BenchmarkFixture { + _temp_dir: TempDir, + service: Arc>, +} + +impl BenchmarkFixture { + fn new() -> Self { + let temp_dir = TempDir::new().unwrap(); + let database_path = temp_dir.path().join("bench.sqlite"); + let mut pool_config = PoolConfig::new(database_path); + pool_config.max_size = 5; + pool_config.min_idle = Some(1); + pool_config.connection_timeout = Duration::from_secs(5); + let pool = Arc::new(DatabasePool::try_from(pool_config).unwrap()); + let repository = Arc::new(ConversationRepositoryImpl::new(pool, WorkspaceHash::new(0))); + let service = Arc::new(ForgeConversationService::new(repository)); + + Self { + _temp_dir: temp_dir, + service, + } + } +} + +async fn run_parallel_same_conversation_writes( + service: Arc>, + tasks: usize, + writes_per_task: usize, +) { + let conversation_id = ConversationId::generate(); + let mut join_set = JoinSet::new(); + + for task_index in 0..tasks { + let service = service.clone(); + join_set.spawn(async move { + for write_index in 0..writes_per_task { + let title = format!("task-{task_index}-write-{write_index}"); + let conversation = Conversation::new(conversation_id).title(title); + service.upsert_conversation(conversation).await.unwrap(); + } + }); + } + + while let Some(result) = join_set.join_next().await { + result.unwrap(); + } +} + +fn bench_parallel_same_conversation_writes(c: &mut Criterion) { + let runtime = Runtime::new().unwrap(); + let mut group = c.benchmark_group("conversation_persistence"); + + for (tasks, writes_per_task) in [(4usize, 10usize), (8, 10), (16, 10)] { + let fixture = BenchmarkFixture::new(); + group.throughput(Throughput::Elements((tasks * writes_per_task) as u64)); + group.bench_function( + BenchmarkId::from_parameter(format!("same-id-{tasks}x{writes_per_task}")), + |b| { + b.to_async(&runtime).iter(|| async { + run_parallel_same_conversation_writes( + fixture.service.clone(), + tasks, + writes_per_task, + ) + .await; + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_parallel_same_conversation_writes); +criterion_main!(benches); diff --git a/crates/forge_services/src/conversation.rs b/crates/forge_services/src/conversation.rs index adb81e6c11..7d3b372d1c 100644 --- a/crates/forge_services/src/conversation.rs +++ b/crates/forge_services/src/conversation.rs @@ -1,21 +1,47 @@ +use std::future::Future; use std::sync::Arc; use anyhow::Result; +use dashmap::DashMap; use forge_app::ConversationService; use forge_app::domain::{Conversation, ConversationId}; use forge_domain::ConversationRepository; +use tokio::sync::Mutex; /// Service for managing conversations, including creation, retrieval, and /// updates #[derive(Clone)] pub struct ForgeConversationService { conversation_repository: Arc, + conversation_lock_cache: Arc>>>, } impl ForgeConversationService { /// Creates a new ForgeConversationService with the provided repository pub fn new(repo: Arc) -> Self { - Self { conversation_repository: repo } + Self { + conversation_repository: repo, + conversation_lock_cache: Arc::new(DashMap::new()), + } + } + + async fn run_serialized_write( + &self, + conversation_id: ConversationId, + operation: F, + ) -> Result + where + F: FnOnce() -> Fut + Send, + Fut: Future> + Send, + T: Send, + { + let conversation_lock = self + .conversation_lock_cache + .entry(conversation_id) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _guard = conversation_lock.lock().await; + operation().await } } @@ -26,17 +52,19 @@ impl ConversationService for ForgeConversationService F: FnOnce(&mut Conversation) -> T + Send, T: Send, { - let mut conversation = self - .conversation_repository - .get_conversation(id) - .await? - .ok_or_else(|| forge_app::domain::Error::ConversationNotFound(*id))?; - let out = f(&mut conversation); - let _ = self - .conversation_repository - .upsert_conversation(conversation) - .await?; - Ok(out) + self.run_serialized_write(*id, || async { + let mut conversation = self + .conversation_repository + .get_conversation(id) + .await? + .ok_or_else(|| forge_app::domain::Error::ConversationNotFound(*id))?; + let out = f(&mut conversation); + self.conversation_repository + .upsert_conversation(conversation) + .await?; + Ok(out) + }) + .await } async fn find_conversation(&self, id: &ConversationId) -> Result> { @@ -44,11 +72,13 @@ impl ConversationService for ForgeConversationService } async fn upsert_conversation(&self, conversation: Conversation) -> Result<()> { - let _ = self - .conversation_repository - .upsert_conversation(conversation) - .await?; - Ok(()) + self.run_serialized_write(conversation.id, || async { + self.conversation_repository + .upsert_conversation(conversation) + .await?; + Ok(()) + }) + .await } async fn get_conversations(&self, limit: Option) -> Result>> { @@ -62,8 +92,114 @@ impl ConversationService for ForgeConversationService } async fn delete_conversation(&self, conversation_id: &ConversationId) -> Result<()> { - self.conversation_repository - .delete_conversation(conversation_id) - .await + self.run_serialized_write(*conversation_id, || async { + self.conversation_repository + .delete_conversation(conversation_id) + .await + }) + .await + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + use std::time::Duration; + + use pretty_assertions::assert_eq; + use tokio::sync::Mutex; + + use super::*; + + struct RecordingConversationRepository { + conversations: Mutex>, + active_upserts: Mutex>, + overlapping_upserts: Mutex>, + } + + impl RecordingConversationRepository { + fn new() -> Self { + Self { + conversations: Mutex::new(HashMap::new()), + active_upserts: Mutex::new(HashSet::new()), + overlapping_upserts: Mutex::new(Vec::new()), + } + } + + async fn overlapping_upserts(&self) -> Vec { + self.overlapping_upserts.lock().await.clone() + } + } + + #[async_trait::async_trait] + impl ConversationRepository for RecordingConversationRepository { + async fn upsert_conversation(&self, conversation: Conversation) -> Result<()> { + { + let mut active_upserts = self.active_upserts.lock().await; + if !active_upserts.insert(conversation.id) { + self.overlapping_upserts.lock().await.push(conversation.id); + } + } + + tokio::time::sleep(Duration::from_millis(50)).await; + + self.conversations + .lock() + .await + .insert(conversation.id, conversation.clone()); + + self.active_upserts.lock().await.remove(&conversation.id); + Ok(()) + } + + async fn get_conversation(&self, id: &ConversationId) -> Result> { + Ok(self.conversations.lock().await.get(id).cloned()) + } + + async fn get_all_conversations( + &self, + _limit: Option, + ) -> Result>> { + let actual = self.conversations.lock().await.values().cloned().collect::>(); + if actual.is_empty() { + Ok(None) + } else { + Ok(Some(actual)) + } + } + + async fn get_last_conversation(&self) -> Result> { + Ok(self.conversations.lock().await.values().last().cloned()) + } + + async fn delete_conversation(&self, conversation_id: &ConversationId) -> Result<()> { + self.conversations.lock().await.remove(conversation_id); + Ok(()) + } + } + + #[tokio::test] + async fn test_upsert_conversation_serializes_same_conversation_id() -> Result<()> { + let fixture = Conversation::new(ConversationId::generate()).title("Serialized".to_string()); + let repository = Arc::new(RecordingConversationRepository::new()); + let service = Arc::new(ForgeConversationService::new(repository.clone())); + + let first_service = service.clone(); + let first_fixture = fixture.clone(); + let first_task = tokio::spawn(async move { first_service.upsert_conversation(first_fixture).await }); + + let second_service = service.clone(); + let second_fixture = fixture.clone(); + let second_task = + tokio::spawn(async move { second_service.upsert_conversation(second_fixture).await }); + + first_task.await??; + second_task.await??; + + let actual = repository.overlapping_upserts().await; + let expected = Vec::::new(); + assert_eq!(actual, expected); + Ok(()) } } diff --git a/crates/forge_services/src/lib.rs b/crates/forge_services/src/lib.rs index bb102e86c6..c325f8b8f6 100644 --- a/crates/forge_services/src/lib.rs +++ b/crates/forge_services/src/lib.rs @@ -27,6 +27,7 @@ mod utils; pub use app_config::*; pub use clipper::*; pub use command::*; +pub use conversation::*; pub use context_engine::*; pub use discovery::*; pub use error::*; From 2f5b2ef4737f735350017cf06f2aa7b1a6ccdba2 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:21:27 +0000 Subject: [PATCH 2/4] [autofix.ci] apply automated fixes --- crates/forge_repo/src/lib.rs | 3 ++- .../benches/conversation_persistence.rs | 5 +---- crates/forge_services/src/conversation.rs | 11 +++++++++-- crates/forge_services/src/lib.rs | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/forge_repo/src/lib.rs b/crates/forge_repo/src/lib.rs index 7739f27cb6..445a365cb8 100644 --- a/crates/forge_repo/src/lib.rs +++ b/crates/forge_repo/src/lib.rs @@ -14,7 +14,8 @@ mod proto_generated { tonic::include_proto!("forge.v1"); } -// Expose conversation persistence building blocks for benchmarks and focused validation. +// Expose conversation persistence building blocks for benchmarks and focused +// validation. pub use conversation::ConversationRepositoryImpl; pub use database::{DatabasePool, PoolConfig}; pub use forge_repo::*; diff --git a/crates/forge_services/benches/conversation_persistence.rs b/crates/forge_services/benches/conversation_persistence.rs index bfc691517d..abb214398a 100644 --- a/crates/forge_services/benches/conversation_persistence.rs +++ b/crates/forge_services/benches/conversation_persistence.rs @@ -27,10 +27,7 @@ impl BenchmarkFixture { let repository = Arc::new(ConversationRepositoryImpl::new(pool, WorkspaceHash::new(0))); let service = Arc::new(ForgeConversationService::new(repository)); - Self { - _temp_dir: temp_dir, - service, - } + Self { _temp_dir: temp_dir, service } } } diff --git a/crates/forge_services/src/conversation.rs b/crates/forge_services/src/conversation.rs index 7d3b372d1c..b5d56a1490 100644 --- a/crates/forge_services/src/conversation.rs +++ b/crates/forge_services/src/conversation.rs @@ -161,7 +161,13 @@ mod tests { &self, _limit: Option, ) -> Result>> { - let actual = self.conversations.lock().await.values().cloned().collect::>(); + let actual = self + .conversations + .lock() + .await + .values() + .cloned() + .collect::>(); if actual.is_empty() { Ok(None) } else { @@ -187,7 +193,8 @@ mod tests { let first_service = service.clone(); let first_fixture = fixture.clone(); - let first_task = tokio::spawn(async move { first_service.upsert_conversation(first_fixture).await }); + let first_task = + tokio::spawn(async move { first_service.upsert_conversation(first_fixture).await }); let second_service = service.clone(); let second_fixture = fixture.clone(); diff --git a/crates/forge_services/src/lib.rs b/crates/forge_services/src/lib.rs index c325f8b8f6..0f9715fcaa 100644 --- a/crates/forge_services/src/lib.rs +++ b/crates/forge_services/src/lib.rs @@ -27,8 +27,8 @@ mod utils; pub use app_config::*; pub use clipper::*; pub use command::*; -pub use conversation::*; pub use context_engine::*; +pub use conversation::*; pub use discovery::*; pub use error::*; pub use forge_services::*; From faae991aca17cb4aa8b1e0d5ab027f50d27f3fd3 Mon Sep 17 00:00:00 2001 From: Amit Date: Wed, 15 Apr 2026 19:57:04 +0530 Subject: [PATCH 3/4] fix(conversation): use global write lock to serialize all database writes SQLite only allows one writer at a time, so per-conversation locking was ineffective. Changed to a single global write lock that serializes ALL database write operations (upsert, delete) across all conversations. This prevents pool exhaustion when multiple concurrent tasks attempt writes - only one write hits SQLite at a time, preventing the scenario where tasks hold pooled connections while waiting for SQLite's single writer lock. Also removes unused dashmap dependency. Fixes #3021 Co-Authored-By: ForgeCode --- crates/forge_services/src/conversation.rs | 65 ++++++++++++++++------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/crates/forge_services/src/conversation.rs b/crates/forge_services/src/conversation.rs index b5d56a1490..7753aec921 100644 --- a/crates/forge_services/src/conversation.rs +++ b/crates/forge_services/src/conversation.rs @@ -2,18 +2,21 @@ use std::future::Future; use std::sync::Arc; use anyhow::Result; -use dashmap::DashMap; use forge_app::ConversationService; use forge_app::domain::{Conversation, ConversationId}; use forge_domain::ConversationRepository; use tokio::sync::Mutex; -/// Service for managing conversations, including creation, retrieval, and -/// updates +/// Service for managing conversations with serialized database writes to prevent +/// SQLite contention. SQLite only allows one writer at a time, so all write +/// operations are serialized at the service layer to prevent pool exhaustion. #[derive(Clone)] pub struct ForgeConversationService { conversation_repository: Arc, - conversation_lock_cache: Arc>>>, + /// Global write lock to serialize all database writes. + /// SQLite only allows one writer at a time, so we queue all writes + /// to prevent pool contention when multiple tasks try to write concurrently. + write_lock: Arc>, } impl ForgeConversationService { @@ -21,26 +24,21 @@ impl ForgeConversationService { pub fn new(repo: Arc) -> Self { Self { conversation_repository: repo, - conversation_lock_cache: Arc::new(DashMap::new()), + write_lock: Arc::new(Mutex::new(())), } } - async fn run_serialized_write( - &self, - conversation_id: ConversationId, - operation: F, - ) -> Result + /// Runs a write operation serialized behind a global lock. + /// + /// This prevents multiple concurrent writes from exhausting the connection pool + /// while waiting for SQLite's single writer lock. + async fn run_serialized_write(&self, operation: F) -> Result where F: FnOnce() -> Fut + Send, Fut: Future> + Send, T: Send, { - let conversation_lock = self - .conversation_lock_cache - .entry(conversation_id) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .clone(); - let _guard = conversation_lock.lock().await; + let _guard = self.write_lock.lock().await; operation().await } } @@ -52,7 +50,7 @@ impl ConversationService for ForgeConversationService F: FnOnce(&mut Conversation) -> T + Send, T: Send, { - self.run_serialized_write(*id, || async { + self.run_serialized_write(|| async { let mut conversation = self .conversation_repository .get_conversation(id) @@ -72,7 +70,7 @@ impl ConversationService for ForgeConversationService } async fn upsert_conversation(&self, conversation: Conversation) -> Result<()> { - self.run_serialized_write(conversation.id, || async { + self.run_serialized_write(|| async { self.conversation_repository .upsert_conversation(conversation) .await?; @@ -92,7 +90,7 @@ impl ConversationService for ForgeConversationService } async fn delete_conversation(&self, conversation_id: &ConversationId) -> Result<()> { - self.run_serialized_write(*conversation_id, || async { + self.run_serialized_write(|| async { self.conversation_repository .delete_conversation(conversation_id) .await @@ -186,7 +184,34 @@ mod tests { } #[tokio::test] - async fn test_upsert_conversation_serializes_same_conversation_id() -> Result<()> { + async fn test_global_write_serialization_prevents_overlapping_writes() -> Result<()> { + // Test that ALL writes are serialized, not just same-conversation writes + // This is critical because SQLite only allows one writer at a time + let repository = Arc::new(RecordingConversationRepository::new()); + let service = Arc::new(ForgeConversationService::new(repository.clone())); + + let conversation1 = Conversation::new(ConversationId::generate()).title("First".to_string()); + let conversation2 = Conversation::new(ConversationId::generate()).title("Second".to_string()); + + // Spawn two tasks writing DIFFERENT conversations concurrently + let service1 = service.clone(); + let task1 = tokio::spawn(async move { service1.upsert_conversation(conversation1).await }); + + let service2 = service.clone(); + let task2 = tokio::spawn(async move { service2.upsert_conversation(conversation2).await }); + + task1.await??; + task2.await??; + + // With global serialization, no two writes should overlap + let actual = repository.overlapping_upserts().await; + let expected = Vec::::new(); + assert_eq!(actual, expected, "Different-conversation writes should not overlap"); + Ok(()) + } + + #[tokio::test] + async fn test_same_conversation_writes_serialized() -> Result<()> { let fixture = Conversation::new(ConversationId::generate()).title("Serialized".to_string()); let repository = Arc::new(RecordingConversationRepository::new()); let service = Arc::new(ForgeConversationService::new(repository.clone())); From d964530bda69559cdf8e3e0ff99e32c441e4cf87 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:29:32 +0000 Subject: [PATCH 4/4] [autofix.ci] apply automated fixes --- crates/forge_services/src/conversation.rs | 27 ++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/forge_services/src/conversation.rs b/crates/forge_services/src/conversation.rs index 7753aec921..ccd587697d 100644 --- a/crates/forge_services/src/conversation.rs +++ b/crates/forge_services/src/conversation.rs @@ -7,15 +7,17 @@ use forge_app::domain::{Conversation, ConversationId}; use forge_domain::ConversationRepository; use tokio::sync::Mutex; -/// Service for managing conversations with serialized database writes to prevent -/// SQLite contention. SQLite only allows one writer at a time, so all write -/// operations are serialized at the service layer to prevent pool exhaustion. +/// Service for managing conversations with serialized database writes to +/// prevent SQLite contention. SQLite only allows one writer at a time, so all +/// write operations are serialized at the service layer to prevent pool +/// exhaustion. #[derive(Clone)] pub struct ForgeConversationService { conversation_repository: Arc, /// Global write lock to serialize all database writes. /// SQLite only allows one writer at a time, so we queue all writes - /// to prevent pool contention when multiple tasks try to write concurrently. + /// to prevent pool contention when multiple tasks try to write + /// concurrently. write_lock: Arc>, } @@ -29,9 +31,9 @@ impl ForgeConversationService { } /// Runs a write operation serialized behind a global lock. - /// - /// This prevents multiple concurrent writes from exhausting the connection pool - /// while waiting for SQLite's single writer lock. + /// + /// This prevents multiple concurrent writes from exhausting the connection + /// pool while waiting for SQLite's single writer lock. async fn run_serialized_write(&self, operation: F) -> Result where F: FnOnce() -> Fut + Send, @@ -190,8 +192,10 @@ mod tests { let repository = Arc::new(RecordingConversationRepository::new()); let service = Arc::new(ForgeConversationService::new(repository.clone())); - let conversation1 = Conversation::new(ConversationId::generate()).title("First".to_string()); - let conversation2 = Conversation::new(ConversationId::generate()).title("Second".to_string()); + let conversation1 = + Conversation::new(ConversationId::generate()).title("First".to_string()); + let conversation2 = + Conversation::new(ConversationId::generate()).title("Second".to_string()); // Spawn two tasks writing DIFFERENT conversations concurrently let service1 = service.clone(); @@ -206,7 +210,10 @@ mod tests { // With global serialization, no two writes should overlap let actual = repository.overlapping_upserts().await; let expected = Vec::::new(); - assert_eq!(actual, expected, "Different-conversation writes should not overlap"); + assert_eq!( + actual, expected, + "Different-conversation writes should not overlap" + ); Ok(()) }