From e16f840f964aca95f76c914b167bc58ccdbaba74 Mon Sep 17 00:00:00 2001 From: Jonathan Donaldson Date: Mon, 4 Aug 2025 23:31:12 +0100 Subject: [PATCH 1/2] Remove table registry Co-Authored-By: Alex Wakefield <1181370+aewakefield@users.noreply.github.com> Co-Authored-By: Fabio Mirza <79943909+mapokko@users.noreply.github.com> --- Cargo.toml | 1 - eventastic/src/aggregate/root.rs | 2 +- eventastic/src/memory.rs | 2 +- eventastic/src/test_fixtures.rs | 14 +- eventastic_outbox_postgres/Cargo.toml | 2 +- eventastic_outbox_postgres/src/outbox.rs | 104 ++++-- eventastic_postgres/Cargo.toml | 9 +- eventastic_postgres/src/common.rs | 52 ++- eventastic_postgres/src/error.rs | 102 ++++++ eventastic_postgres/src/lib.rs | 55 +--- eventastic_postgres/src/pickle.rs | 1 + eventastic_postgres/src/reader_impl.rs | 30 +- eventastic_postgres/src/repository.rs | 76 ++--- eventastic_postgres/src/side_effect.rs | 8 +- eventastic_postgres/src/table_config.rs | 49 +++ eventastic_postgres/src/table_registry.rs | 152 --------- eventastic_postgres/src/transaction.rs | 95 +++--- eventastic_postgres/tests/common/helpers.rs | 16 +- eventastic_postgres/tests/common/mod.rs | 1 + .../tests/common/test_order_aggregate.rs | 185 +++++++++++ eventastic_postgres/tests/encryption.rs | 22 +- eventastic_postgres/tests/multi_aggregate.rs | 300 ++++++++++++++++++ examples/bank/Cargo.toml | 3 +- examples/bank/src/main.rs | 10 +- 24 files changed, 891 insertions(+), 400 deletions(-) create mode 100644 eventastic_postgres/src/error.rs create mode 100644 eventastic_postgres/src/table_config.rs delete mode 100644 eventastic_postgres/src/table_registry.rs create mode 100644 eventastic_postgres/tests/common/test_order_aggregate.rs create mode 100644 eventastic_postgres/tests/multi_aggregate.rs diff --git a/Cargo.toml b/Cargo.toml index e5c6366..6112b80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,3 @@ chrono = "0.4" serde_json = "1" tokio = { version = "1", features = ["full"] } futures-util = "0.3" -anyhow = "1" diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index eef3d29..c60e1f2 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -426,7 +426,7 @@ mod tests { assert_eq!(context.version(), 0); for i in 1..=5 { - let add_event = create_add_event(&format!("add-{}", i), i); + let add_event = create_add_event(&format!("add-{i}"), i); context.record_that(add_event).unwrap(); assert_eq!(context.version(), i as u64); } diff --git a/eventastic/src/memory.rs b/eventastic/src/memory.rs index 4c7a3c3..0f67964 100644 --- a/eventastic/src/memory.rs +++ b/eventastic/src/memory.rs @@ -870,7 +870,7 @@ mod tests { // Verify side effects were stored // Reset: 2 side effects, Add: 1, Subtract: 1, Multiply: 0, Add: 1 = 5 total - let expected_side_effects = 2 + 1 + 1 + 0 + 1; + let expected_side_effects = 5; assert_eq!(repository.side_effects_count(), expected_side_effects); let side_effects = repository.get_all_side_effects(); diff --git a/eventastic/src/test_fixtures.rs b/eventastic/src/test_fixtures.rs index 0895f91..6d6d4ee 100644 --- a/eventastic/src/test_fixtures.rs +++ b/eventastic/src/test_fixtures.rs @@ -4,8 +4,6 @@ //! that can be used across different test modules to ensure consistency //! and reduce code duplication. -#![cfg(test)] - use crate::{ aggregate::{Aggregate, SideEffect}, event::DomainEvent, @@ -139,21 +137,21 @@ impl Aggregate for TestCounter { match event { TestEvent::Reset { event_id, .. } => Some(vec![ TestSideEffect::LogOperation { - id: format!("{}-log", event_id), + id: format!("{event_id}-log"), operation: "Reset".to_string(), }, TestSideEffect::NotifyUser { - id: format!("{}-notify", event_id), + id: format!("{event_id}-notify"), message: "Counter has been reset".to_string(), }, ]), TestEvent::Add { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Add {}", value), + id: format!("{event_id}-log"), + operation: format!("Add {value}"), }]), TestEvent::Subtract { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Subtract {}", value), + id: format!("{event_id}-log"), + operation: format!("Subtract {value}"), }]), TestEvent::Multiply { .. } => None, // No side effects for multiply } diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index 95b7d1a..d0de095 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,7 +4,6 @@ version = "0.5.0" edition = "2024" [dependencies] -anyhow = { workspace = true } eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } sqlx = { workspace = true } @@ -13,3 +12,4 @@ uuid = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } +thiserror = { workspace = true } diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs index 1721286..3084d92 100644 --- a/eventastic_outbox_postgres/src/outbox.rs +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -1,17 +1,54 @@ -use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Aggregate, SideEffect}; use eventastic::event::DomainEvent; use eventastic_postgres::{ - DbError, EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage, + EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectDbError, + SideEffectStorage, }; use sqlx::{Postgres, Transaction}; use std::sync::Arc; +use thiserror::Error; use uuid::Uuid; use crate::OutboxMessage; +/// Errors that can occur during outbox operations. +#[derive(Error, Debug)] +pub enum OutboxError { + /// A database operation failed. + #[error("Database error: {0}")] + Database(sqlx::Error), + /// Failed to encrypt or decrypt side effect data. + #[error("Encryption error: {0}")] + Encryption(EncryptionError), + /// Failed to pickle or unpickle side effect data. + #[error("Side effect pickling error: {0}")] + SideEffectPickling(SideEffectPicklingError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for OutboxError { + fn from(e: sqlx::Error) -> Self { + OutboxError::Database(e) + } +} + +impl From> for SideEffectDbError { + fn from(e: OutboxError) -> Self { + match e { + OutboxError::Database(err) => SideEffectDbError::DbError(err), + OutboxError::Encryption(err) => SideEffectDbError::Encryption(err), + OutboxError::SideEffectPickling(err) => SideEffectDbError::SideEffectPicklingError(err), + OutboxError::EncryptionProviderReturnedWrongNumberOfItems => { + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} + /// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. #[derive(Clone, Copy, Default)] pub struct TableOutbox { @@ -36,7 +73,7 @@ where &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, - ) -> Result<(), DbError> { + ) -> Result<(), SideEffectDbError::Error>> { let mut ids: Vec = Vec::with_capacity(items.len()); let mut messages: Vec> = Vec::with_capacity(items.len()); let mut retries: Vec = Vec::with_capacity(items.len()); @@ -49,8 +86,7 @@ where let id = *side_effect.id(); let msg = side_effect .pickle() - .context("Failed to pickle side effect") - .map_err(DbError::PicklingError)?; + .map_err(SideEffectDbError::SideEffectPicklingError)?; ids.push(id); plain.push(msg); retries.push(0); @@ -62,9 +98,9 @@ where .encryption_provider .encrypt(plain) .await - .map_err(DbError::Encryption)?; + .map_err(SideEffectDbError::Encryption)?; if number_of_items != cipher.len() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems); } messages.append(&mut cipher); } @@ -91,22 +127,30 @@ where } #[async_trait] -pub trait TransactionOutboxExt +pub trait TransactionOutboxExt where T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { - async fn get_outbox_batch(&mut self) -> Result>, DbError>; + async fn get_outbox_batch( + &mut self, + ) -> Result>, OutboxError>; - async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError>; + async fn delete_outbox_item( + &mut self, + id: T::SideEffectId, + ) -> Result<(), OutboxError>; - async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError>; + async fn update_outbox_item( + &mut self, + item: OutboxMessage, + ) -> Result<(), OutboxError>; } #[async_trait] -impl TransactionOutboxExt +impl TransactionOutboxExt::Error> for PostgresTransaction<'_, T, TableOutbox, E> where T: Aggregate + Send + Sync + Pickle + 'static, @@ -119,7 +163,10 @@ where { async fn get_outbox_batch( &mut self, - ) -> Result>, DbError> { + ) -> Result< + Vec>, + OutboxError::Error>, + > { #[derive(sqlx::FromRow)] struct OutboxRow { message: Vec, @@ -143,9 +190,9 @@ where .encryption_provider() .decrypt(cipher) .await - .map_err(DbError::Encryption)?; + .map_err(OutboxError::Encryption)?; if plain.len() != number_of_items { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(OutboxError::EncryptionProviderReturnedWrongNumberOfItems); } messages.append(&mut plain); } @@ -154,17 +201,16 @@ where .zip(messages.into_iter()) .map(|(row, message)| { let msg = - T::SideEffect::unpickle(&message).context("Failed to unpickle side effect")?; + T::SideEffect::unpickle(&message).map_err(OutboxError::SideEffectPickling)?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) }) - .collect::, anyhow::Error>>() - .map_err(DbError::PicklingError) + .collect::, OutboxError::Error>>>() } async fn delete_outbox_item( &mut self, id: ::SideEffectId, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { sqlx::query("DELETE FROM outbox WHERE id = $1") .bind(id) .execute(self.inner_mut().as_mut()) @@ -175,7 +221,7 @@ where async fn update_outbox_item( &mut self, item: OutboxMessage, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") .bind(item.message.id()) .bind(i32::from(item.retries)) @@ -228,12 +274,13 @@ pub trait SideEffectHandler { pub trait RepositoryOutboxExt where T: Aggregate + Send + Sync + Pickle + 'static, + T::DomainEvent: Pickle + Send + Sync, T::SideEffect: SideEffect + Pickle + Clone + Send + Sync + 'static, ::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync + 'static, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -241,7 +288,7 @@ where &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError>; + ) -> Result<(), OutboxError::Error>>; } #[async_trait] @@ -255,7 +302,7 @@ where H: SideEffectHandler + Send + Sync + 'static, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -263,7 +310,7 @@ where &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { let handler = Arc::new(handler); loop { let deadline = std::time::Instant::now() + poll_interval; @@ -276,7 +323,7 @@ where async fn process_outbox_batch( repo: &PostgresRepository, E>, handler: Arc, -) -> Result<(), DbError> +) -> Result<(), OutboxError::Error>> where T: Aggregate + Send + Sync + Pickle + 'static, T::SideEffect: SideEffect + Pickle + Send + Sync, @@ -285,7 +332,7 @@ where T::ApplyError: Send + Sync, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -308,5 +355,8 @@ where } } - tx.commit().await + tx.into_inner() + .commit() + .await + .map_err(OutboxError::Database) } diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 7956b07..fbf287b 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -10,15 +10,14 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] -anyhow = { workspace = true } async-trait = { workspace = true } async-stream = { workspace = true } chrono = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures = { workspace = true } futures-util = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true, optional = true } +serde_json = { workspace = true, optional = true } sqlx = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -26,3 +25,7 @@ tokio = { workspace = true } [dev-dependencies] eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } uuid = { workspace = true } + +[features] +default = [] +serde = ["dep:serde", "dep:serde_json"] diff --git a/eventastic_postgres/src/common.rs b/eventastic_postgres/src/common.rs index fa9299b..abfad2d 100644 --- a/eventastic_postgres/src/common.rs +++ b/eventastic_postgres/src/common.rs @@ -5,12 +5,33 @@ use crate::DbError; use crate::pickle::Pickle; -use anyhow::Context; use eventastic::aggregate::Aggregate; use eventastic::event::{DomainEvent, EventStoreEvent}; use eventastic::repository::Snapshot; use sqlx::types::Uuid; +/// Type alias for the complex return type of event conversion operations. +type EventResult = Result< + EventStoreEvent<::DomainEvent>, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + +/// Type alias for the complex return type of snapshot conversion operations. +type SnapshotResult = Result< + Snapshot, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + /// Internal representation of a database row containing event data. /// /// This struct is used to deserialize event rows from the database @@ -36,21 +57,22 @@ impl PartialEventRow { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized. - pub fn to_event(row: PartialEventRow) -> Result, DbError> + /// Returns [`DbError::EventPicklingError`] if the event JSON cannot be deserialized. + pub fn to_event(row: PartialEventRow) -> EventResult where - Evt: DomainEvent + Pickle, + T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; - Evt::unpickle(&row.event) + T::DomainEvent::unpickle(&row.event) .map(|e| EventStoreEvent { id: row.event_id, event: e, version: row_version, }) - .context("Failed to unpickle event") - .map_err(DbError::PicklingError) + .map_err(DbError::EventPicklingError) } } @@ -80,17 +102,17 @@ impl PartialSnapshotRow { /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. /// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized. - pub fn to_snapshot(row: PartialSnapshotRow) -> Result, DbError> + /// Returns [`DbError::SnapshotPicklingError`] if the aggregate JSON cannot be deserialized. + pub fn to_snapshot(row: PartialSnapshotRow) -> SnapshotResult where T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; let snapshot_version = u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?; - let aggregate: T = T::unpickle(&row.aggregate) - .context("Failed to unpickle aggregate") - .map_err(DbError::PicklingError)?; + let aggregate: T = T::unpickle(&row.aggregate).map_err(DbError::SnapshotPicklingError)?; Ok(Snapshot { aggregate, @@ -109,7 +131,7 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the conversion fails. - pub fn version_to_i64(version: u64) -> Result> { + pub fn version_to_i64(version: u64) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidVersionNumber) } @@ -118,7 +140,9 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidSnapshotVersion`] if the conversion fails. - pub fn snapshot_version_to_i64(version: u64) -> Result> { + pub fn snapshot_version_to_i64( + version: u64, + ) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidSnapshotVersion) } } diff --git a/eventastic_postgres/src/error.rs b/eventastic_postgres/src/error.rs new file mode 100644 index 0000000..ab120c3 --- /dev/null +++ b/eventastic_postgres/src/error.rs @@ -0,0 +1,102 @@ +use eventastic::aggregate::Aggregate; +use thiserror::Error; + +use crate::{EncryptionProvider, Pickle}; + +#[allow(type_alias_bounds)] +pub type EventSourcingDbError = DbError< + E::Error, + ::Error, + ::Error, + ::Error, +>; + +#[derive(Error, Debug)] +pub enum DbError< + EncryptionError, + EventPicklingError, + SnapshotPicklingError, + SideEffectPicklingError, +> { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle data. + #[error("Pickling Error {0}")] + EventPicklingError(EventPicklingError), + /// Failed to pickle snapshot data. + #[error("Snapshot Pickling Error {0}")] + SnapshotPicklingError(SnapshotPicklingError), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// An invalid version number was encountered (e.g., negative value where positive expected). + #[error("Invalid Version Number")] + InvalidVersionNumber, + /// An invalid snapshot version number was encountered. + #[error("Invalid Snapshot Version Number")] + InvalidSnapshotVersion, + /// A concurrent modification was detected (optimistic locking failure). + #[error("Optimistic Concurrency Error")] + OptimisticConcurrencyError, + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Failed to encrypt or decrypt data. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +/// Errors that can occur during side effect storage operations. +/// +/// This is a specialized error type for side effect operations that only +/// includes the errors relevant to storing and retrieving side effects. +#[derive(Error, Debug)] +pub enum SideEffectDbError { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for DbError { + fn from(e: sqlx::Error) -> Self { + if let Some(db_error) = e.as_database_error() { + if let Some(code) = db_error.code() { + if code == "23505" && db_error.message().contains("aggregate_version") { + return DbError::OptimisticConcurrencyError; + } + } + } + DbError::DbError(e) + } +} + +impl From for SideEffectDbError { + fn from(e: sqlx::Error) -> Self { + SideEffectDbError::DbError(e) + } +} + +impl From> for DbError { + fn from(e: SideEffectDbError) -> Self { + match e { + SideEffectDbError::DbError(err) => DbError::DbError(err), + SideEffectDbError::SideEffectPicklingError(err) => { + DbError::SideEffectPicklingError(err) + } + SideEffectDbError::Encryption(err) => DbError::Encryption(err), + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems => { + DbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 140ef68..08085a7 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,19 +1,23 @@ mod common; mod encryption; +mod error; mod pickle; mod reader_impl; mod repository; mod side_effect; -mod table_registry; +mod table_config; mod transaction; pub use encryption::{EncryptionProvider, NoEncryption, NoEncryptionError}; +pub use error::{DbError, SideEffectDbError}; pub use pickle::Pickle; pub use repository::PostgresRepository; pub use side_effect::SideEffectStorage; -pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; +pub use table_config::TableConfig; pub use transaction::PostgresTransaction; +use crate::error::EventSourcingDbError; + use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, @@ -21,49 +25,6 @@ use eventastic::{ repository::{Repository, RepositoryError}, }; use sqlx::types::Uuid; -use thiserror::Error; - -/// Errors that can occur during PostgreSQL operations. -#[derive(Error, Debug)] -pub enum DbError { - /// A database operation failed. - #[error("DB Error {0}")] - DbError(sqlx::Error), - /// Failed to pickle data. - #[error("Pickling Error {0}")] - PicklingError(anyhow::Error), - /// An invalid version number was encountered (e.g., negative value where positive expected). - #[error("Invalid Version Number")] - InvalidVersionNumber, - /// An invalid snapshot version number was encountered. - #[error("Invalid Snapshot Version Number")] - InvalidSnapshotVersion, - /// A concurrent modification was detected (optimistic locking failure). - #[error("Optimistic Concurrency Error")] - OptimisticConcurrencyError, - /// An aggregate type was not registered in the table registry. - #[error("Aggregate type not registered in table registry")] - UnregisteredAggregate, - /// Failed to encrypt or decrypt data. - #[error("Encryption Error {0}")] - Encryption(E), - /// Failed to encrypt or decrypt data. - #[error("Encryption provider returned wrong number of items")] - EncrypytionProviderReturnedWrongNumberOfItems, -} - -impl From for DbError { - fn from(e: sqlx::Error) -> Self { - if let Some(db_error) = e.as_database_error() { - if let Some(code) = db_error.code() { - if code == "23505" && db_error.message().contains("aggregate_version") { - return DbError::OptimisticConcurrencyError; - } - } - } - DbError::DbError(e) - } -} /// Extension trait for loading aggregates from PostgreSQL storage. /// @@ -91,7 +52,7 @@ where RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > { Context::load(transaction, &aggregate_id).await @@ -109,7 +70,7 @@ where RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > where diff --git a/eventastic_postgres/src/pickle.rs b/eventastic_postgres/src/pickle.rs index e81f267..0281e9d 100644 --- a/eventastic_postgres/src/pickle.rs +++ b/eventastic_postgres/src/pickle.rs @@ -13,6 +13,7 @@ pub trait Pickle: Sized { fn unpickle(bytes: &[u8]) -> Result; } +#[cfg(feature = "serde")] impl Pickle for T where T: serde::Serialize + serde::de::DeserializeOwned, diff --git a/eventastic_postgres/src/reader_impl.rs b/eventastic_postgres/src/reader_impl.rs index e3159b3..8dcde6c 100644 --- a/eventastic_postgres/src/reader_impl.rs +++ b/eventastic_postgres/src/reader_impl.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use crate::common::{PartialEventRow, PartialSnapshotRow, utils}; use crate::pickle::Pickle; -use crate::{DbError, EncryptionProvider}; +use crate::{DbError, EncryptionProvider, EventSourcingDbError}; use eventastic::aggregate::Aggregate; use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; @@ -25,12 +25,13 @@ pub fn stream_from<'e, 'c: 'e, E, T, EP>( query: Arc, encryption_provider: &'e EP, ) -> impl futures::Stream< - Item = std::result::Result, DbError>, + Item = std::result::Result, EventSourcingDbError>, > + 'e where E: Executor<'c, Database = sqlx::Postgres> + 'e, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send + 'e, + T::SideEffect: Pickle, EP: EncryptionProvider + Sync + Send + 'e, { let id = *id; @@ -54,11 +55,11 @@ where .await .map_err(DbError::Encryption)?; if plain.len() != number_of_items { - Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems)?; + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; } for (mut row, plain) in chunk.into_iter().zip(plain.into_iter()) { row.event = plain; - yield PartialEventRow::to_event(row); + yield PartialEventRow::to_event::(row); } } } @@ -71,11 +72,12 @@ pub async fn get_event<'c, E, T, EP>( event_id: &<::DomainEvent as DomainEvent>::EventId, query: &str, encryption_provider: &EP, -) -> Result::DomainEvent>>, DbError> +) -> Result::DomainEvent>>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send, + T::SideEffect: Pickle, EP: EncryptionProvider, { let Some(mut row) = query_as::<_, PartialEventRow>(query) @@ -92,13 +94,13 @@ where .map_err(DbError::Encryption)? .into_iter(); let Some(event) = plain.next() else { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); }; if plain.next().is_some() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } row.event = event; - Ok(Some(PartialEventRow::to_event(row)?)) + Ok(Some(PartialEventRow::to_event::(row)?)) } /// Generic implementation for getting a snapshot from configured table. @@ -107,10 +109,12 @@ pub async fn get_snapshot<'c, E, T, EP>( id: &T::AggregateId, query: &str, encryption_provider: &EP, -) -> Result>, DbError> +) -> Result>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, EP: EncryptionProvider, { let row = query_as::<_, PartialSnapshotRow>(query) @@ -128,12 +132,12 @@ where .await .map_err(DbError::Encryption)?; if plain.len() != 1 { - Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems)?; + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; } row.aggregate = plain .into_iter() .next() .expect("Decrypt must return 1 item for snapshot"); - Ok(Some(PartialSnapshotRow::to_snapshot(row)?)) + Ok(Some(PartialSnapshotRow::to_snapshot::(row)?)) } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index 7552ce1..8fb7149 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,8 +1,6 @@ -use std::marker::PhantomData; - use crate::{ - DbError, PostgresTransaction, SideEffectStorage, TableRegistry, encryption::EncryptionProvider, - pickle::Pickle, reader_impl, + EventSourcingDbError, PostgresTransaction, SideEffectStorage, encryption::EncryptionProvider, + pickle::Pickle, reader_impl, table_config::TableConfig, }; use async_trait::async_trait; use eventastic::{ @@ -10,12 +8,12 @@ use eventastic::{ event::{DomainEvent, EventStoreEvent}, repository::{Repository, RepositoryError, RepositoryReader, Snapshot}, }; -use futures::StreamExt; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, types::Uuid, }; +use std::marker::PhantomData; /// PostgreSQL-based repository implementation for event sourcing. /// @@ -23,26 +21,15 @@ use sqlx::{ /// using PostgreSQL as the backing store. It integrates with a configurable side effect /// storage mechanism for handling the outbox pattern. #[derive(Clone)] -pub struct PostgresRepository -where - T: Clone, - O: Clone, - E: Clone, -{ - pub(crate) inner: Pool, - pub(crate) outbox: O, - pub(crate) tables: TableRegistry, +pub struct PostgresRepository { + inner: Pool, + outbox: O, + table_config: TableConfig, encryption_provider: E, phantom_aggregate: std::marker::PhantomData, } -impl PostgresRepository -where - T: Aggregate + Clone, - T::SideEffect: SideEffect + Pickle + Send + Sync, - O: SideEffectStorage + Clone, - E: EncryptionProvider + Clone, -{ +impl PostgresRepository { /// Creates a new PostgreSQL repository with the specified connection and pool options. /// /// # Parameters @@ -50,12 +37,11 @@ where /// - `connect_options` - PostgreSQL connection configuration /// - `pool_options` - Connection pool configuration /// - `outbox` - Side effect storage implementation for the outbox pattern - /// - `tables` - Registry of table configurations for different aggregates pub async fn new( connect_options: PgConnectOptions, pool_options: PgPoolOptions, + table_config: TableConfig, outbox: O, - tables: TableRegistry, encryption_provider: E, ) -> Result { let pool = pool_options.connect_with(connect_options).await?; @@ -63,7 +49,7 @@ where Ok(Self { inner: pool, outbox, - tables, + table_config, encryption_provider, phantom_aggregate: PhantomData, }) @@ -77,12 +63,29 @@ where Ok(PostgresTransaction { inner: self.inner.begin().await?, outbox: &self.outbox, - tables: &self.tables, + table_config: &self.table_config, encryption_provider: &self.encryption_provider, phantom_aggregate: PhantomData, }) } + /// Create a transaction from an existing raw sqlx transaction. + /// + /// This is useful for multi-aggregate scenarios where you want to use + /// the same database transaction across multiple repository types. + pub fn transaction_from<'a>( + &'a self, + transaction: sqlx::Transaction<'a, Postgres>, + ) -> PostgresTransaction<'a, T, O, E> { + PostgresTransaction { + inner: transaction, + outbox: &self.outbox, + table_config: &self.table_config, + encryption_provider: &self.encryption_provider, + phantom_aggregate: PhantomData, + } + } + /// Run database migrations to set up the required tables and schema. /// /// This method should be called once during application startup to ensure @@ -105,7 +108,7 @@ where O: SideEffectStorage + Clone + Send + Sync, E: EncryptionProvider + Clone + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -120,17 +123,12 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query, - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; + let query = &self.table_config.stream_events_query; Box::pin(reader_impl::stream_from::<_, T, E>( &self.inner, id, version, - query, + query.clone(), &self.encryption_provider, )) } @@ -141,10 +139,7 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_event_query; reader_impl::get_event::<_, T, E>( &self.inner, aggregate_id, @@ -160,10 +155,7 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_snapshot_query; reader_impl::get_snapshot::<_, T, E>(&self.inner, id, query, &self.encryption_provider) .await } @@ -182,7 +174,7 @@ where type Error = RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >; /// Loads an aggregate from the repository by its ID. diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs index d6c37de..2452bf3 100644 --- a/eventastic_postgres/src/side_effect.rs +++ b/eventastic_postgres/src/side_effect.rs @@ -1,4 +1,4 @@ -use crate::DbError; +use crate::SideEffectDbError; use crate::pickle::Pickle; use async_trait::async_trait; use eventastic::aggregate::SideEffect; @@ -11,7 +11,7 @@ use sqlx::{Postgres, Transaction}; /// different implementations such as direct table storage or outbox patterns. /// Implementors define how side effects are persisted within a database transaction. #[async_trait] -pub trait SideEffectStorage: Send + Sync +pub trait SideEffectStorage: Send + Sync where T: SideEffect + Pickle + Send + Sync, { @@ -27,10 +27,10 @@ where /// /// # Errors /// - /// Returns [`DbError`] if the storage operation fails. + /// Returns [`SideEffectDbError`] if the storage operation fails. async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, - ) -> Result<(), DbError>; + ) -> Result<(), SideEffectDbError::Error>>; } diff --git a/eventastic_postgres/src/table_config.rs b/eventastic_postgres/src/table_config.rs new file mode 100644 index 0000000..999fe4b --- /dev/null +++ b/eventastic_postgres/src/table_config.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +/// Configuration for database tables used by an aggregate type. +/// +/// This struct contains pre-computed SQL queries to avoid string allocation +/// during query execution. +#[derive(Debug, Clone)] +pub struct TableConfig { + pub(crate) stream_events_query: Arc, + pub(crate) get_event_query: String, + pub(crate) get_snapshot_query: String, + pub(crate) insert_events_query: String, + pub(crate) upsert_snapshot_query: String, +} + +impl TableConfig { + /// Create a new TableConfig with pre-computed queries. + pub fn new(events: impl Into, snapshots: impl Into) -> Self { + let events = events.into(); + let snapshots = snapshots.into(); + + Self { + stream_events_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", + &events + ).into(), + get_event_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", + &events + ), + get_snapshot_query: format!( + "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", + &snapshots + ), + insert_events_query: format!( + "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ + SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ + ON CONFLICT DO NOTHING returning event_id", + &events + ), + upsert_snapshot_query: format!( + "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", + &snapshots + ), + } + } +} diff --git a/eventastic_postgres/src/table_registry.rs b/eventastic_postgres/src/table_registry.rs deleted file mode 100644 index 2a3eceb..0000000 --- a/eventastic_postgres/src/table_registry.rs +++ /dev/null @@ -1,152 +0,0 @@ -use eventastic::aggregate::Aggregate; -use std::any::TypeId; -use std::collections::HashMap; -use std::sync::Arc; - -/// Configuration for database tables used by an aggregate type. -/// -/// This struct contains pre-computed SQL queries to avoid string allocation -/// during query execution. -#[derive(Debug, Clone)] -pub struct TableConfig { - pub(crate) stream_events_query: Arc, - pub(crate) get_event_query: String, - pub(crate) get_snapshot_query: String, - pub(crate) insert_events_query: String, - pub(crate) upsert_snapshot_query: String, -} - -impl TableConfig { - /// Create a new TableConfig with pre-computed queries. - pub fn new(events: impl Into, snapshots: impl Into) -> Self { - let events = events.into(); - let snapshots = snapshots.into(); - - Self { - stream_events_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", - &events - ).into(), - get_event_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", - &events - ), - get_snapshot_query: format!( - "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", - &snapshots - ), - insert_events_query: format!( - "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ - SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ - ON CONFLICT DO NOTHING returning event_id", - &events - ), - upsert_snapshot_query: format!( - "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ - VALUES ($1, $2, $3, $4, $5) \ - ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", - &snapshots - ), - } - } -} - -/// Registry that maps aggregate types to their table configurations. -/// -/// This allows different aggregate types to use different tables while -/// supporting runtime configuration. -#[derive(Debug, Clone, Default)] -pub struct TableRegistry { - tables: HashMap>, -} - -impl TableRegistry { - /// Create a new empty table registry. - pub fn new() -> Self { - Self { - tables: HashMap::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(&mut self, config: TableConfig) { - self.tables.insert(TypeId::of::(), Arc::new(config)); - } - - /// Get the stream events query for an aggregate type. - pub fn stream_events_query(&self) -> Option> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.stream_events_query.clone()) - } - - /// Get the get event query for an aggregate type. - pub fn get_event_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_event_query.as_str()) - } - - /// Get the get snapshot query for an aggregate type. - pub fn get_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_snapshot_query.as_str()) - } - - /// Get the insert events query for an aggregate type. - pub fn insert_events_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.insert_events_query.as_str()) - } - - /// Get the upsert snapshot query for an aggregate type. - pub fn upsert_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.upsert_snapshot_query.as_str()) - } -} - -/// Builder for creating a TableRegistry with a fluent API. -pub struct TableRegistryBuilder { - registry: TableRegistry, -} - -impl TableRegistryBuilder { - /// Create a new builder. - pub fn new() -> Self { - Self { - registry: TableRegistry::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(mut self, config: TableConfig) -> Self { - self.registry.register::(config); - self - } - - /// Register table configuration for an aggregate type with explicit table names. - pub fn register_with_tables( - mut self, - events: impl Into, - snapshots: impl Into, - ) -> Self { - self.registry - .register::(TableConfig::new(events, snapshots)); - self - } - - /// Build the TableRegistry. - pub fn build(self) -> TableRegistry { - self.registry - } -} - -impl Default for TableRegistryBuilder { - fn default() -> Self { - Self::new() - } -} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 8fea71c..b76392e 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,7 +1,7 @@ use crate::common::utils; use crate::pickle::Pickle; -use crate::{DbError, EncryptionProvider, SideEffectStorage, TableRegistry, reader_impl}; -use anyhow::Context as _; +use crate::table_config::TableConfig; +use crate::{DbError, EncryptionProvider, EventSourcingDbError, SideEffectStorage, reader_impl}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -12,7 +12,6 @@ use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use eventastic::repository::{RepositoryError, RepositoryReader, RepositoryWriter}; -use futures::StreamExt; use sqlx::Row; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -24,25 +23,23 @@ use sqlx::{Postgres, Transaction}; pub struct PostgresTransaction<'a, T, O, E> { pub(crate) inner: Transaction<'a, Postgres>, pub(crate) outbox: &'a O, - pub(crate) tables: &'a TableRegistry, + pub(crate) table_config: &'a TableConfig, pub(crate) encryption_provider: &'a E, pub(crate) phantom_aggregate: std::marker::PhantomData, } impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> where - O: SideEffectStorage, - E: EncryptionProvider + Send + Sync + 'static, - T: Aggregate + 'static + Send + Sync + Pickle, - T::DomainEvent: DomainEvent + Pickle + Send + Sync, - T::SideEffect: SideEffect + Pickle + Send + Sync, - T::ApplyError: Send + Sync, + T: eventastic::aggregate::Aggregate + Pickle, + T::DomainEvent: Pickle, + T::SideEffect: Pickle, + E: EncryptionProvider, { /// Commit the transaction to the database. /// /// This finalizes all operations performed within this transaction, /// making them permanently visible to other database connections. - pub async fn commit(self) -> Result<(), DbError> { + pub async fn commit(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.commit().await?) } @@ -50,7 +47,7 @@ where /// /// This undoes all operations performed within this transaction, /// returning the database to its state before the transaction began. - pub async fn rollback(self) -> Result<(), DbError> { + pub async fn rollback(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.rollback().await?) } @@ -64,25 +61,36 @@ where &mut self.inner } - /// Get an aggregate by ID using the table registry. + /// Get the encryption provider reference + pub fn encryption_provider(&self) -> &E { + self.encryption_provider + } +} + +impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> +where + O: SideEffectStorage, + E: EncryptionProvider + Send + Sync + 'static, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::ApplyError: Send + Sync, +{ + /// Get an aggregate by ID. pub async fn get( &mut self, id: &Uuid, - ) -> Result, RepositoryError>> { + ) -> Result, RepositoryError>> { Context::load(self, id).await } - /// Store an aggregate using the table registry. + /// Store an aggregate. pub async fn store( &mut self, aggregate: &mut Context, - ) -> Result<(), SaveError>> { + ) -> Result<(), SaveError>> { aggregate.save(self).await } - - pub fn encryption_provider(&self) -> &E { - self.encryption_provider - } } #[async_trait] @@ -95,7 +103,7 @@ where O: SideEffectStorage, E: EncryptionProvider + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -110,17 +118,12 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query, - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; + let query = &self.table_config.stream_events_query; Box::pin(reader_impl::stream_from::<_, T, E>( &mut *self.inner, id, version, - query, + query.clone(), self.encryption_provider, )) } @@ -131,10 +134,7 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_event_query; reader_impl::get_event::<_, T, E>( &mut *self.inner, aggregate_id, @@ -150,10 +150,7 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_snapshot_query; reader_impl::get_snapshot::<_, T, E>(&mut *self.inner, id, query, self.encryption_provider) .await } @@ -190,11 +187,7 @@ where let version = utils::version_to_i64(version)?; - let serialised_event = event - .event - .pickle() - .context("Failed to pickle event") - .map_err(DbError::PicklingError)?; + let serialised_event = event.event.pickle().map_err(DbError::EventPicklingError)?; event_ids_to_insert.push(event_id); versions_to_insert.push(version); @@ -209,15 +202,12 @@ where .await .map_err(DbError::Encryption)?; if cipher.len() != number_of_items { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } events_to_insert.append(&mut cipher); } - let insert_query = self - .tables - .insert_events_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let insert_query = &self.table_config.insert_events_query; let inserted_ids: Result, sqlx::Error> = sqlx::query(insert_query) .bind(&event_ids_to_insert[..]) @@ -240,8 +230,7 @@ where let aggregate = snapshot .aggregate .pickle() - .context("Failed to pickle aggregate") - .map_err(DbError::PicklingError)?; + .map_err(DbError::SnapshotPicklingError)?; let mut cipher = self .encryption_provider .encrypt(vec![aggregate]) @@ -249,16 +238,13 @@ where .map_err(DbError::Encryption)? .into_iter(); let Some(aggregate) = cipher.next() else { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); }; if cipher.next().is_some() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } - let upsert_query = self - .tables - .upsert_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let upsert_query = &self.table_config.upsert_snapshot_query; sqlx::query(upsert_query) .bind(aggregated_id) @@ -281,5 +267,6 @@ where self.outbox .store_side_effects(&mut self.inner, outbox_item) .await + .map_err(|e| e.into()) } } diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index fcddfad..f2146cb 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_outbox_postgres::TableOutbox; use eventastic_postgres::{ - EncryptionProvider, NoEncryption, Pickle, PostgresRepository, TableRegistryBuilder, + EncryptionProvider, NoEncryption, Pickle, PostgresRepository, TableConfig, }; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -20,15 +20,11 @@ pub async fn get_repository() -> PostgresRepository("events", "snapshots") - .build(); - let repo = PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(NoEncryption), - tables, NoEncryption, ) .await @@ -48,15 +44,11 @@ pub async fn get_encrypted_repository() let pool_options = PoolOptions::default(); - let tables = TableRegistryBuilder::new() - .register_with_tables::("events", "snapshots") - .build(); - let repo = PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(TestEncryptionProvider), - tables, TestEncryptionProvider, ) .await @@ -67,7 +59,7 @@ pub async fn get_encrypted_repository() repo } -#[derive(serde::Deserialize, Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone)] pub struct SavedSnapshot { pub version: i64, pub aggregate: Account, diff --git a/eventastic_postgres/tests/common/mod.rs b/eventastic_postgres/tests/common/mod.rs index c590871..7bf30b2 100644 --- a/eventastic_postgres/tests/common/mod.rs +++ b/eventastic_postgres/tests/common/mod.rs @@ -3,3 +3,4 @@ pub mod encryption; pub mod helpers; pub mod test_aggregate; +pub mod test_order_aggregate; diff --git a/eventastic_postgres/tests/common/test_order_aggregate.rs b/eventastic_postgres/tests/common/test_order_aggregate.rs new file mode 100644 index 0000000..cf6b6d1 --- /dev/null +++ b/eventastic_postgres/tests/common/test_order_aggregate.rs @@ -0,0 +1,185 @@ +use eventastic::aggregate::Aggregate; +use eventastic::aggregate::SideEffect; +use eventastic::event::DomainEvent; +use serde::Deserialize; +use serde::Serialize; +use thiserror::Error; +use uuid::Uuid; + +// Define our Order aggregate - different from Account +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Order { + pub order_id: Uuid, + pub customer_id: Uuid, + pub total_amount: i64, + pub status: OrderStatus, +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum OrderStatus { + Pending, + Confirmed, + Shipped, + Delivered, + Cancelled, +} + +// Define our domain events for Order +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderEvent { + Created { + order_id: Uuid, + event_id: Uuid, + customer_id: Uuid, + total_amount: i64, + }, + Confirmed { + event_id: Uuid, + }, + Shipped { + event_id: Uuid, + tracking_number: String, + }, + Delivered { + event_id: Uuid, + }, + Cancelled { + event_id: Uuid, + reason: String, + }, +} + +impl DomainEvent for OrderEvent { + type EventId = Uuid; + fn id(&self) -> &Uuid { + match self { + OrderEvent::Created { event_id, .. } + | OrderEvent::Confirmed { event_id, .. } + | OrderEvent::Shipped { event_id, .. } + | OrderEvent::Delivered { event_id, .. } + | OrderEvent::Cancelled { event_id, .. } => event_id, + } + } +} + +// Define our domain error for Order +#[derive(Error, Debug)] +pub enum OrderDomainError { + #[error("This event can't be applied given the current state of the order")] + InvalidState, + #[error("Order is already in final state")] + AlreadyFinalized, +} + +// Define our side effects for Order - different from Account side effects +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderSideEffects { + SendConfirmationEmail { + id: Uuid, + customer_email: String, + order_id: Uuid, + }, + NotifyWarehouse { + id: Uuid, + order_id: Uuid, + items: Vec, + }, + UpdateInventory { + id: Uuid, + product_ids: Vec, + quantities: Vec, + }, +} + +impl SideEffect for OrderSideEffects { + type SideEffectId = Uuid; + + fn id(&self) -> &Self::SideEffectId { + match self { + OrderSideEffects::SendConfirmationEmail { id, .. } + | OrderSideEffects::NotifyWarehouse { id, .. } + | OrderSideEffects::UpdateInventory { id, .. } => id, + } + } +} + +// Implement the aggregate trait for our Order struct +impl Aggregate for Order { + const SNAPSHOT_VERSION: u64 = 1; + + type AggregateId = Uuid; + type DomainEvent = OrderEvent; + type ApplyError = OrderDomainError; + type SideEffect = OrderSideEffects; + + fn aggregate_id(&self) -> &Self::AggregateId { + &self.order_id + } + + fn apply(&mut self, event: &Self::DomainEvent) -> Result<(), Self::ApplyError> { + match event { + OrderEvent::Confirmed { .. } => { + if self.status != OrderStatus::Pending { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Confirmed; + } + OrderEvent::Shipped { .. } => { + if self.status != OrderStatus::Confirmed { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Shipped; + } + OrderEvent::Delivered { .. } => { + if self.status != OrderStatus::Shipped { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Delivered; + } + OrderEvent::Cancelled { .. } => { + if matches!(self.status, OrderStatus::Delivered | OrderStatus::Cancelled) { + return Err(Self::ApplyError::AlreadyFinalized); + } + self.status = OrderStatus::Cancelled; + } + OrderEvent::Created { .. } => return Err(Self::ApplyError::InvalidState), + } + Ok(()) + } + + fn apply_new(event: &Self::DomainEvent) -> Result { + match event { + OrderEvent::Created { + order_id, + customer_id, + total_amount, + .. + } => Ok(Self { + order_id: *order_id, + customer_id: *customer_id, + total_amount: *total_amount, + status: OrderStatus::Pending, + }), + _ => Err(Self::ApplyError::InvalidState), + } + } + + fn side_effects(&self, event: &Self::DomainEvent) -> Option> { + let side_effect = match event { + OrderEvent::Created { event_id, .. } => Some(OrderSideEffects::SendConfirmationEmail { + id: *event_id, + customer_email: "customer@example.com".to_string(), + order_id: self.order_id, + }), + OrderEvent::Confirmed { event_id } => Some(OrderSideEffects::NotifyWarehouse { + id: *event_id, + order_id: self.order_id, + items: vec!["item1".to_string(), "item2".to_string()], + }), + OrderEvent::Shipped { .. } + | OrderEvent::Delivered { .. } + | OrderEvent::Cancelled { .. } => None, + }; + side_effect.map(|s| vec![s]) + } +} diff --git a/eventastic_postgres/tests/encryption.rs b/eventastic_postgres/tests/encryption.rs index bd16d46..7e2a9c5 100644 --- a/eventastic_postgres/tests/encryption.rs +++ b/eventastic_postgres/tests/encryption.rs @@ -36,9 +36,7 @@ async fn when_encryption_is_enabled_aggregate_can_be_saved_and_loaded() { .expect("Failed to commit transaction"); // Assert - let loaded_account = load_encrypted_account(account_id) - .await - .expect("Failed to load account"); + let loaded_account = load_encrypted_account(account_id).await; let loaded_account = loaded_account.state(); assert_eq!(created_account, loaded_account); @@ -130,7 +128,7 @@ async fn when_encryption_is_enabled_events_cannot_be_loaded_by_id_without_encryp Account, >>::get_event(&mut repository, &account_id, &event_id) .await; - assert!(matches!(result, Err(DbError::PicklingError(_)))); + assert!(matches!(result, Err(DbError::EventPicklingError(_)))); } #[tokio::test] @@ -165,7 +163,7 @@ async fn when_encryption_is_enabled_events_can_be_saved_and_loaded() { &mut repository, &account_id, 0 ); while let Some(event) = events.next().await { - assert!(matches!(event, Ok(_))); + assert!(event.is_ok()); } } @@ -200,7 +198,7 @@ async fn when_encryption_is_enabled_events_cannot_be_loaded_without_encryption() while let Some(event) = events.next().await { assert!(matches!( event, - Err(eventastic_postgres::DbError::PicklingError(_)) + Err(eventastic_postgres::DbError::EventPicklingError(_)) )); } } @@ -230,10 +228,11 @@ async fn when_encryption_is_enabled_aggregate_cannot_be_loaded_without_encryptio // Assert let repository = get_repository().await; let mut transaction = repository.begin_transaction().await.unwrap(); + assert!(matches!( transaction.get(&account_id).await, Err(eventastic::repository::RepositoryError::Repository( - eventastic_postgres::DbError::PicklingError(_) + eventastic_postgres::DbError::SnapshotPicklingError(_) )), )); } @@ -290,7 +289,7 @@ async fn when_encryption_is_enabled_side_effect_can_be_saved_and_loaded() { } } -async fn load_encrypted_account(account_id: Uuid) -> anyhow::Result> { +async fn load_encrypted_account(account_id: Uuid) -> Context { let repository = get_encrypted_repository().await; let mut transaction = repository @@ -298,7 +297,8 @@ async fn load_encrypted_account(account_id: Uuid) -> anyhow::Result = transaction.get(&account_id).await?; - - Ok(context) + transaction + .get(&account_id) + .await + .expect("Failed to encrypted load account") } diff --git a/eventastic_postgres/tests/multi_aggregate.rs b/eventastic_postgres/tests/multi_aggregate.rs new file mode 100644 index 0000000..b26f206 --- /dev/null +++ b/eventastic_postgres/tests/multi_aggregate.rs @@ -0,0 +1,300 @@ +mod common; + +use common::helpers::get_repository; +use common::test_aggregate::{Account, AccountEvent}; +use common::test_order_aggregate::{Order, OrderEvent, OrderStatus}; +use eventastic::aggregate::Root; +use eventastic_outbox_postgres::TableOutbox; +use eventastic_postgres::PostgresRepository; +use eventastic_postgres::{NoEncryption, TableConfig}; +use sqlx::pool::PoolOptions; +use sqlx::postgres::PgConnectOptions; +use std::str::FromStr; +use uuid::Uuid; + +// Helper function to get an order repository using the same pool +async fn get_order_repository() -> PostgresRepository, NoEncryption> +{ + let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); + let connection_string = format!("postgres://postgres:password@{host}/postgres"); + let connection_options = + PgConnectOptions::from_str(&connection_string).expect("Failed to parse connection options"); + + let pool_options = PoolOptions::default(); + + PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(NoEncryption), + NoEncryption, + ) + .await + .expect("Failed to connect to postgres") +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_commit_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().account_id, account_id); + assert_eq!(loaded_account.state().balance, 1000); + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().order_id, order_id); + assert_eq!(loaded_order.state().total_amount, 500); + assert_eq!(loaded_order.state().status, OrderStatus::Pending); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_rollback_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Rollback the transaction instead of committing + order_tx + .rollback() + .await + .expect("Failed to rollback transaction"); + + // Assert - verify neither aggregate was saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let account_result = account_load_tx.get(&account_id).await; + assert!( + account_result.is_err(), + "Account should not exist after rollback" + ); + account_load_tx + .rollback() + .await + .expect("Failed to rollback load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let order_result = order_load_tx.get(&order_id).await; + assert!( + order_result.is_err(), + "Order should not exist after rollback" + ); + order_load_tx + .rollback() + .await + .expect("Failed to rollback order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_with_mixed_side_effects() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create account and add money (generates side effects) + let account_open_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_open_event).expect("Failed to create account"); + + let add_event = AccountEvent::Add { + event_id: Uuid::new_v4(), + amount: 500, + }; + account + .record_that(add_event) + .expect("Failed to add money to account"); + + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and confirm order (generates different side effects) + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + + let confirm_event = OrderEvent::Confirmed { + event_id: Uuid::new_v4(), + }; + order + .record_that(confirm_event) + .expect("Failed to confirm order"); + + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved with correct states + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().balance, 1500); // 1000 + 500 + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().status, OrderStatus::Confirmed); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} diff --git a/examples/bank/Cargo.toml b/examples/bank/Cargo.toml index e64ff49..8a2f0bc 100644 --- a/examples/bank/Cargo.toml +++ b/examples/bank/Cargo.toml @@ -7,12 +7,11 @@ edition = "2024" [dependencies] eventastic = { path = "../../eventastic" } -eventastic_postgres = { path = "../../eventastic_postgres" } +eventastic_postgres = { path = "../../eventastic_postgres", features = ["serde"] } eventastic_outbox_postgres = { path = "../../eventastic_outbox_postgres" } thiserror = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } sqlx = { workspace = true } -anyhow = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 0c6b3f3..b1b7d31 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -9,7 +9,7 @@ use eventastic::aggregate::SideEffect; use eventastic::event::DomainEvent; use eventastic::repository::Repository; use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; -use eventastic_postgres::{NoEncryption, PostgresRepository, RootExt}; +use eventastic_postgres::{NoEncryption, PostgresRepository, RootExt, TableConfig}; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -17,7 +17,7 @@ use thiserror::Error; use uuid::Uuid; #[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> Result<(), Box> { // Setup postgres repo let repository = get_repository().await; @@ -392,15 +392,11 @@ async fn get_repository() -> PostgresRepository("events", "snapshots") - .build(); - PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(NoEncryption), - tables, NoEncryption, ) .await From 0502ff711eda492f94bfe5c89689175d968ffaf7 Mon Sep 17 00:00:00 2001 From: Alex Wakefield Date: Thu, 11 Sep 2025 15:07:18 +0100 Subject: [PATCH 2/2] chore: simplify the error message When we get this error logged we end up with lots of different messages due to the aggregate ID and version number. These should be logged outside of the message to make grouping this easier. --- eventastic/src/aggregate/root.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index c60e1f2..90baf90 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -342,7 +342,7 @@ where /// This error is returned when the Repository fails to insert the event /// because the version already exists, indicating a concurrent modification. - #[error("Optimistic Concurrency Error Version {1} of aggregate {0:?} already exists")] + #[error("Optimistic Concurrency Error")] OptimisticConcurrency(T::AggregateId, u64), }