diff --git a/Cargo.toml b/Cargo.toml index e5c6366..6112b80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,4 +33,3 @@ chrono = "0.4" serde_json = "1" tokio = { version = "1", features = ["full"] } futures-util = "0.3" -anyhow = "1" diff --git a/eventastic/src/aggregate/root.rs b/eventastic/src/aggregate/root.rs index eef3d29..90baf90 100644 --- a/eventastic/src/aggregate/root.rs +++ b/eventastic/src/aggregate/root.rs @@ -342,7 +342,7 @@ where /// This error is returned when the Repository fails to insert the event /// because the version already exists, indicating a concurrent modification. - #[error("Optimistic Concurrency Error Version {1} of aggregate {0:?} already exists")] + #[error("Optimistic Concurrency Error")] OptimisticConcurrency(T::AggregateId, u64), } @@ -426,7 +426,7 @@ mod tests { assert_eq!(context.version(), 0); for i in 1..=5 { - let add_event = create_add_event(&format!("add-{}", i), i); + let add_event = create_add_event(&format!("add-{i}"), i); context.record_that(add_event).unwrap(); assert_eq!(context.version(), i as u64); } diff --git a/eventastic/src/memory.rs b/eventastic/src/memory.rs index 4c7a3c3..0f67964 100644 --- a/eventastic/src/memory.rs +++ b/eventastic/src/memory.rs @@ -870,7 +870,7 @@ mod tests { // Verify side effects were stored // Reset: 2 side effects, Add: 1, Subtract: 1, Multiply: 0, Add: 1 = 5 total - let expected_side_effects = 2 + 1 + 1 + 0 + 1; + let expected_side_effects = 5; assert_eq!(repository.side_effects_count(), expected_side_effects); let side_effects = repository.get_all_side_effects(); diff --git a/eventastic/src/test_fixtures.rs b/eventastic/src/test_fixtures.rs index 0895f91..6d6d4ee 100644 --- a/eventastic/src/test_fixtures.rs +++ b/eventastic/src/test_fixtures.rs @@ -4,8 +4,6 @@ //! that can be used across different test modules to ensure consistency //! and reduce code duplication. -#![cfg(test)] - use crate::{ aggregate::{Aggregate, SideEffect}, event::DomainEvent, @@ -139,21 +137,21 @@ impl Aggregate for TestCounter { match event { TestEvent::Reset { event_id, .. } => Some(vec![ TestSideEffect::LogOperation { - id: format!("{}-log", event_id), + id: format!("{event_id}-log"), operation: "Reset".to_string(), }, TestSideEffect::NotifyUser { - id: format!("{}-notify", event_id), + id: format!("{event_id}-notify"), message: "Counter has been reset".to_string(), }, ]), TestEvent::Add { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Add {}", value), + id: format!("{event_id}-log"), + operation: format!("Add {value}"), }]), TestEvent::Subtract { event_id, value } => Some(vec![TestSideEffect::LogOperation { - id: format!("{}-log", event_id), - operation: format!("Subtract {}", value), + id: format!("{event_id}-log"), + operation: format!("Subtract {value}"), }]), TestEvent::Multiply { .. } => None, // No side effects for multiply } diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index 95b7d1a..d0de095 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,7 +4,6 @@ version = "0.5.0" edition = "2024" [dependencies] -anyhow = { workspace = true } eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } sqlx = { workspace = true } @@ -13,3 +12,4 @@ uuid = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } +thiserror = { workspace = true } diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs index 1721286..3084d92 100644 --- a/eventastic_outbox_postgres/src/outbox.rs +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -1,17 +1,54 @@ -use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Aggregate, SideEffect}; use eventastic::event::DomainEvent; use eventastic_postgres::{ - DbError, EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage, + EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectDbError, + SideEffectStorage, }; use sqlx::{Postgres, Transaction}; use std::sync::Arc; +use thiserror::Error; use uuid::Uuid; use crate::OutboxMessage; +/// Errors that can occur during outbox operations. +#[derive(Error, Debug)] +pub enum OutboxError { + /// A database operation failed. + #[error("Database error: {0}")] + Database(sqlx::Error), + /// Failed to encrypt or decrypt side effect data. + #[error("Encryption error: {0}")] + Encryption(EncryptionError), + /// Failed to pickle or unpickle side effect data. + #[error("Side effect pickling error: {0}")] + SideEffectPickling(SideEffectPicklingError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for OutboxError { + fn from(e: sqlx::Error) -> Self { + OutboxError::Database(e) + } +} + +impl From> for SideEffectDbError { + fn from(e: OutboxError) -> Self { + match e { + OutboxError::Database(err) => SideEffectDbError::DbError(err), + OutboxError::Encryption(err) => SideEffectDbError::Encryption(err), + OutboxError::SideEffectPickling(err) => SideEffectDbError::SideEffectPicklingError(err), + OutboxError::EncryptionProviderReturnedWrongNumberOfItems => { + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} + /// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table. #[derive(Clone, Copy, Default)] pub struct TableOutbox { @@ -36,7 +73,7 @@ where &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, - ) -> Result<(), DbError> { + ) -> Result<(), SideEffectDbError::Error>> { let mut ids: Vec = Vec::with_capacity(items.len()); let mut messages: Vec> = Vec::with_capacity(items.len()); let mut retries: Vec = Vec::with_capacity(items.len()); @@ -49,8 +86,7 @@ where let id = *side_effect.id(); let msg = side_effect .pickle() - .context("Failed to pickle side effect") - .map_err(DbError::PicklingError)?; + .map_err(SideEffectDbError::SideEffectPicklingError)?; ids.push(id); plain.push(msg); retries.push(0); @@ -62,9 +98,9 @@ where .encryption_provider .encrypt(plain) .await - .map_err(DbError::Encryption)?; + .map_err(SideEffectDbError::Encryption)?; if number_of_items != cipher.len() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems); } messages.append(&mut cipher); } @@ -91,22 +127,30 @@ where } #[async_trait] -pub trait TransactionOutboxExt +pub trait TransactionOutboxExt where T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { - async fn get_outbox_batch(&mut self) -> Result>, DbError>; + async fn get_outbox_batch( + &mut self, + ) -> Result>, OutboxError>; - async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError>; + async fn delete_outbox_item( + &mut self, + id: T::SideEffectId, + ) -> Result<(), OutboxError>; - async fn update_outbox_item(&mut self, item: OutboxMessage) -> Result<(), DbError>; + async fn update_outbox_item( + &mut self, + item: OutboxMessage, + ) -> Result<(), OutboxError>; } #[async_trait] -impl TransactionOutboxExt +impl TransactionOutboxExt::Error> for PostgresTransaction<'_, T, TableOutbox, E> where T: Aggregate + Send + Sync + Pickle + 'static, @@ -119,7 +163,10 @@ where { async fn get_outbox_batch( &mut self, - ) -> Result>, DbError> { + ) -> Result< + Vec>, + OutboxError::Error>, + > { #[derive(sqlx::FromRow)] struct OutboxRow { message: Vec, @@ -143,9 +190,9 @@ where .encryption_provider() .decrypt(cipher) .await - .map_err(DbError::Encryption)?; + .map_err(OutboxError::Encryption)?; if plain.len() != number_of_items { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(OutboxError::EncryptionProviderReturnedWrongNumberOfItems); } messages.append(&mut plain); } @@ -154,17 +201,16 @@ where .zip(messages.into_iter()) .map(|(row, message)| { let msg = - T::SideEffect::unpickle(&message).context("Failed to unpickle side effect")?; + T::SideEffect::unpickle(&message).map_err(OutboxError::SideEffectPickling)?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) }) - .collect::, anyhow::Error>>() - .map_err(DbError::PicklingError) + .collect::, OutboxError::Error>>>() } async fn delete_outbox_item( &mut self, id: ::SideEffectId, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { sqlx::query("DELETE FROM outbox WHERE id = $1") .bind(id) .execute(self.inner_mut().as_mut()) @@ -175,7 +221,7 @@ where async fn update_outbox_item( &mut self, item: OutboxMessage, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1") .bind(item.message.id()) .bind(i32::from(item.retries)) @@ -228,12 +274,13 @@ pub trait SideEffectHandler { pub trait RepositoryOutboxExt where T: Aggregate + Send + Sync + Pickle + 'static, + T::DomainEvent: Pickle + Send + Sync, T::SideEffect: SideEffect + Pickle + Clone + Send + Sync + 'static, ::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync + 'static, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -241,7 +288,7 @@ where &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError>; + ) -> Result<(), OutboxError::Error>>; } #[async_trait] @@ -255,7 +302,7 @@ where H: SideEffectHandler + Send + Sync + 'static, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -263,7 +310,7 @@ where &self, handler: H, poll_interval: std::time::Duration, - ) -> Result<(), DbError> { + ) -> Result<(), OutboxError::Error>> { let handler = Arc::new(handler); loop { let deadline = std::time::Instant::now() + poll_interval; @@ -276,7 +323,7 @@ where async fn process_outbox_batch( repo: &PostgresRepository, E>, handler: Arc, -) -> Result<(), DbError> +) -> Result<(), OutboxError::Error>> where T: Aggregate + Send + Sync + Pickle + 'static, T::SideEffect: SideEffect + Pickle + Send + Sync, @@ -285,7 +332,7 @@ where T::ApplyError: Send + Sync, E: EncryptionProvider + Clone + Send + Sync + 'static, for<'a> PostgresTransaction<'a, T, TableOutbox, E>: - TransactionOutboxExt, + TransactionOutboxExt::Error>, for<'sql> ::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, { @@ -308,5 +355,8 @@ where } } - tx.commit().await + tx.into_inner() + .commit() + .await + .map_err(OutboxError::Database) } diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 7956b07..fbf287b 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -10,15 +10,14 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] -anyhow = { workspace = true } async-trait = { workspace = true } async-stream = { workspace = true } chrono = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures = { workspace = true } futures-util = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } +serde = { workspace = true, optional = true } +serde_json = { workspace = true, optional = true } sqlx = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -26,3 +25,7 @@ tokio = { workspace = true } [dev-dependencies] eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } uuid = { workspace = true } + +[features] +default = [] +serde = ["dep:serde", "dep:serde_json"] diff --git a/eventastic_postgres/src/common.rs b/eventastic_postgres/src/common.rs index fa9299b..abfad2d 100644 --- a/eventastic_postgres/src/common.rs +++ b/eventastic_postgres/src/common.rs @@ -5,12 +5,33 @@ use crate::DbError; use crate::pickle::Pickle; -use anyhow::Context; use eventastic::aggregate::Aggregate; use eventastic::event::{DomainEvent, EventStoreEvent}; use eventastic::repository::Snapshot; use sqlx::types::Uuid; +/// Type alias for the complex return type of event conversion operations. +type EventResult = Result< + EventStoreEvent<::DomainEvent>, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + +/// Type alias for the complex return type of snapshot conversion operations. +type SnapshotResult = Result< + Snapshot, + DbError< + E, + <::DomainEvent as Pickle>::Error, + ::Error, + <::SideEffect as Pickle>::Error, + >, +>; + /// Internal representation of a database row containing event data. /// /// This struct is used to deserialize event rows from the database @@ -36,21 +57,22 @@ impl PartialEventRow { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized. - pub fn to_event(row: PartialEventRow) -> Result, DbError> + /// Returns [`DbError::EventPicklingError`] if the event JSON cannot be deserialized. + pub fn to_event(row: PartialEventRow) -> EventResult where - Evt: DomainEvent + Pickle, + T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; - Evt::unpickle(&row.event) + T::DomainEvent::unpickle(&row.event) .map(|e| EventStoreEvent { id: row.event_id, event: e, version: row_version, }) - .context("Failed to unpickle event") - .map_err(DbError::PicklingError) + .map_err(DbError::EventPicklingError) } } @@ -80,17 +102,17 @@ impl PartialSnapshotRow { /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. /// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64. - /// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized. - pub fn to_snapshot(row: PartialSnapshotRow) -> Result, DbError> + /// Returns [`DbError::SnapshotPicklingError`] if the aggregate JSON cannot be deserialized. + pub fn to_snapshot(row: PartialSnapshotRow) -> SnapshotResult where T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, { let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; let snapshot_version = u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?; - let aggregate: T = T::unpickle(&row.aggregate) - .context("Failed to unpickle aggregate") - .map_err(DbError::PicklingError)?; + let aggregate: T = T::unpickle(&row.aggregate).map_err(DbError::SnapshotPicklingError)?; Ok(Snapshot { aggregate, @@ -109,7 +131,7 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the conversion fails. - pub fn version_to_i64(version: u64) -> Result> { + pub fn version_to_i64(version: u64) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidVersionNumber) } @@ -118,7 +140,9 @@ pub(crate) mod utils { /// # Errors /// /// Returns [`DbError::InvalidSnapshotVersion`] if the conversion fails. - pub fn snapshot_version_to_i64(version: u64) -> Result> { + pub fn snapshot_version_to_i64( + version: u64, + ) -> Result> { i64::try_from(version).map_err(|_| DbError::InvalidSnapshotVersion) } } diff --git a/eventastic_postgres/src/error.rs b/eventastic_postgres/src/error.rs new file mode 100644 index 0000000..ab120c3 --- /dev/null +++ b/eventastic_postgres/src/error.rs @@ -0,0 +1,102 @@ +use eventastic::aggregate::Aggregate; +use thiserror::Error; + +use crate::{EncryptionProvider, Pickle}; + +#[allow(type_alias_bounds)] +pub type EventSourcingDbError = DbError< + E::Error, + ::Error, + ::Error, + ::Error, +>; + +#[derive(Error, Debug)] +pub enum DbError< + EncryptionError, + EventPicklingError, + SnapshotPicklingError, + SideEffectPicklingError, +> { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle data. + #[error("Pickling Error {0}")] + EventPicklingError(EventPicklingError), + /// Failed to pickle snapshot data. + #[error("Snapshot Pickling Error {0}")] + SnapshotPicklingError(SnapshotPicklingError), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// An invalid version number was encountered (e.g., negative value where positive expected). + #[error("Invalid Version Number")] + InvalidVersionNumber, + /// An invalid snapshot version number was encountered. + #[error("Invalid Snapshot Version Number")] + InvalidSnapshotVersion, + /// A concurrent modification was detected (optimistic locking failure). + #[error("Optimistic Concurrency Error")] + OptimisticConcurrencyError, + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Failed to encrypt or decrypt data. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +/// Errors that can occur during side effect storage operations. +/// +/// This is a specialized error type for side effect operations that only +/// includes the errors relevant to storing and retrieving side effects. +#[derive(Error, Debug)] +pub enum SideEffectDbError { + /// A database operation failed. + #[error("DB Error {0}")] + DbError(sqlx::Error), + /// Failed to pickle side effect data. + #[error("Side Effect Pickling Error {0}")] + SideEffectPicklingError(SideEffectPicklingError), + /// Failed to encrypt or decrypt data. + #[error("Encryption Error {0}")] + Encryption(EncryptionError), + /// Encryption provider returned wrong number of items. + #[error("Encryption provider returned wrong number of items")] + EncryptionProviderReturnedWrongNumberOfItems, +} + +impl From for DbError { + fn from(e: sqlx::Error) -> Self { + if let Some(db_error) = e.as_database_error() { + if let Some(code) = db_error.code() { + if code == "23505" && db_error.message().contains("aggregate_version") { + return DbError::OptimisticConcurrencyError; + } + } + } + DbError::DbError(e) + } +} + +impl From for SideEffectDbError { + fn from(e: sqlx::Error) -> Self { + SideEffectDbError::DbError(e) + } +} + +impl From> for DbError { + fn from(e: SideEffectDbError) -> Self { + match e { + SideEffectDbError::DbError(err) => DbError::DbError(err), + SideEffectDbError::SideEffectPicklingError(err) => { + DbError::SideEffectPicklingError(err) + } + SideEffectDbError::Encryption(err) => DbError::Encryption(err), + SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems => { + DbError::EncryptionProviderReturnedWrongNumberOfItems + } + } + } +} diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 140ef68..08085a7 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,19 +1,23 @@ mod common; mod encryption; +mod error; mod pickle; mod reader_impl; mod repository; mod side_effect; -mod table_registry; +mod table_config; mod transaction; pub use encryption::{EncryptionProvider, NoEncryption, NoEncryptionError}; +pub use error::{DbError, SideEffectDbError}; pub use pickle::Pickle; pub use repository::PostgresRepository; pub use side_effect::SideEffectStorage; -pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; +pub use table_config::TableConfig; pub use transaction::PostgresTransaction; +use crate::error::EventSourcingDbError; + use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, @@ -21,49 +25,6 @@ use eventastic::{ repository::{Repository, RepositoryError}, }; use sqlx::types::Uuid; -use thiserror::Error; - -/// Errors that can occur during PostgreSQL operations. -#[derive(Error, Debug)] -pub enum DbError { - /// A database operation failed. - #[error("DB Error {0}")] - DbError(sqlx::Error), - /// Failed to pickle data. - #[error("Pickling Error {0}")] - PicklingError(anyhow::Error), - /// An invalid version number was encountered (e.g., negative value where positive expected). - #[error("Invalid Version Number")] - InvalidVersionNumber, - /// An invalid snapshot version number was encountered. - #[error("Invalid Snapshot Version Number")] - InvalidSnapshotVersion, - /// A concurrent modification was detected (optimistic locking failure). - #[error("Optimistic Concurrency Error")] - OptimisticConcurrencyError, - /// An aggregate type was not registered in the table registry. - #[error("Aggregate type not registered in table registry")] - UnregisteredAggregate, - /// Failed to encrypt or decrypt data. - #[error("Encryption Error {0}")] - Encryption(E), - /// Failed to encrypt or decrypt data. - #[error("Encryption provider returned wrong number of items")] - EncrypytionProviderReturnedWrongNumberOfItems, -} - -impl From for DbError { - fn from(e: sqlx::Error) -> Self { - if let Some(db_error) = e.as_database_error() { - if let Some(code) = db_error.code() { - if code == "23505" && db_error.message().contains("aggregate_version") { - return DbError::OptimisticConcurrencyError; - } - } - } - DbError::DbError(e) - } -} /// Extension trait for loading aggregates from PostgreSQL storage. /// @@ -91,7 +52,7 @@ where RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > { Context::load(transaction, &aggregate_id).await @@ -109,7 +70,7 @@ where RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >, > where diff --git a/eventastic_postgres/src/pickle.rs b/eventastic_postgres/src/pickle.rs index e81f267..0281e9d 100644 --- a/eventastic_postgres/src/pickle.rs +++ b/eventastic_postgres/src/pickle.rs @@ -13,6 +13,7 @@ pub trait Pickle: Sized { fn unpickle(bytes: &[u8]) -> Result; } +#[cfg(feature = "serde")] impl Pickle for T where T: serde::Serialize + serde::de::DeserializeOwned, diff --git a/eventastic_postgres/src/reader_impl.rs b/eventastic_postgres/src/reader_impl.rs index e3159b3..8dcde6c 100644 --- a/eventastic_postgres/src/reader_impl.rs +++ b/eventastic_postgres/src/reader_impl.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use crate::common::{PartialEventRow, PartialSnapshotRow, utils}; use crate::pickle::Pickle; -use crate::{DbError, EncryptionProvider}; +use crate::{DbError, EncryptionProvider, EventSourcingDbError}; use eventastic::aggregate::Aggregate; use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; @@ -25,12 +25,13 @@ pub fn stream_from<'e, 'c: 'e, E, T, EP>( query: Arc, encryption_provider: &'e EP, ) -> impl futures::Stream< - Item = std::result::Result, DbError>, + Item = std::result::Result, EventSourcingDbError>, > + 'e where E: Executor<'c, Database = sqlx::Postgres> + 'e, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send + 'e, + T::SideEffect: Pickle, EP: EncryptionProvider + Sync + Send + 'e, { let id = *id; @@ -54,11 +55,11 @@ where .await .map_err(DbError::Encryption)?; if plain.len() != number_of_items { - Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems)?; + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; } for (mut row, plain) in chunk.into_iter().zip(plain.into_iter()) { row.event = plain; - yield PartialEventRow::to_event(row); + yield PartialEventRow::to_event::(row); } } } @@ -71,11 +72,12 @@ pub async fn get_event<'c, E, T, EP>( event_id: &<::DomainEvent as DomainEvent>::EventId, query: &str, encryption_provider: &EP, -) -> Result::DomainEvent>>, DbError> +) -> Result::DomainEvent>>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, - T: Aggregate, + T: Aggregate + Pickle, T::DomainEvent: DomainEvent + Pickle + Send, + T::SideEffect: Pickle, EP: EncryptionProvider, { let Some(mut row) = query_as::<_, PartialEventRow>(query) @@ -92,13 +94,13 @@ where .map_err(DbError::Encryption)? .into_iter(); let Some(event) = plain.next() else { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); }; if plain.next().is_some() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } row.event = event; - Ok(Some(PartialEventRow::to_event(row)?)) + Ok(Some(PartialEventRow::to_event::(row)?)) } /// Generic implementation for getting a snapshot from configured table. @@ -107,10 +109,12 @@ pub async fn get_snapshot<'c, E, T, EP>( id: &T::AggregateId, query: &str, encryption_provider: &EP, -) -> Result>, DbError> +) -> Result>, EventSourcingDbError> where E: Executor<'c, Database = sqlx::Postgres>, T: Aggregate + Pickle, + T::DomainEvent: DomainEvent + Pickle, + T::SideEffect: Pickle, EP: EncryptionProvider, { let row = query_as::<_, PartialSnapshotRow>(query) @@ -128,12 +132,12 @@ where .await .map_err(DbError::Encryption)?; if plain.len() != 1 { - Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems)?; + Err(DbError::EncryptionProviderReturnedWrongNumberOfItems)?; } row.aggregate = plain .into_iter() .next() .expect("Decrypt must return 1 item for snapshot"); - Ok(Some(PartialSnapshotRow::to_snapshot(row)?)) + Ok(Some(PartialSnapshotRow::to_snapshot::(row)?)) } diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index 7552ce1..8fb7149 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,8 +1,6 @@ -use std::marker::PhantomData; - use crate::{ - DbError, PostgresTransaction, SideEffectStorage, TableRegistry, encryption::EncryptionProvider, - pickle::Pickle, reader_impl, + EventSourcingDbError, PostgresTransaction, SideEffectStorage, encryption::EncryptionProvider, + pickle::Pickle, reader_impl, table_config::TableConfig, }; use async_trait::async_trait; use eventastic::{ @@ -10,12 +8,12 @@ use eventastic::{ event::{DomainEvent, EventStoreEvent}, repository::{Repository, RepositoryError, RepositoryReader, Snapshot}, }; -use futures::StreamExt; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, types::Uuid, }; +use std::marker::PhantomData; /// PostgreSQL-based repository implementation for event sourcing. /// @@ -23,26 +21,15 @@ use sqlx::{ /// using PostgreSQL as the backing store. It integrates with a configurable side effect /// storage mechanism for handling the outbox pattern. #[derive(Clone)] -pub struct PostgresRepository -where - T: Clone, - O: Clone, - E: Clone, -{ - pub(crate) inner: Pool, - pub(crate) outbox: O, - pub(crate) tables: TableRegistry, +pub struct PostgresRepository { + inner: Pool, + outbox: O, + table_config: TableConfig, encryption_provider: E, phantom_aggregate: std::marker::PhantomData, } -impl PostgresRepository -where - T: Aggregate + Clone, - T::SideEffect: SideEffect + Pickle + Send + Sync, - O: SideEffectStorage + Clone, - E: EncryptionProvider + Clone, -{ +impl PostgresRepository { /// Creates a new PostgreSQL repository with the specified connection and pool options. /// /// # Parameters @@ -50,12 +37,11 @@ where /// - `connect_options` - PostgreSQL connection configuration /// - `pool_options` - Connection pool configuration /// - `outbox` - Side effect storage implementation for the outbox pattern - /// - `tables` - Registry of table configurations for different aggregates pub async fn new( connect_options: PgConnectOptions, pool_options: PgPoolOptions, + table_config: TableConfig, outbox: O, - tables: TableRegistry, encryption_provider: E, ) -> Result { let pool = pool_options.connect_with(connect_options).await?; @@ -63,7 +49,7 @@ where Ok(Self { inner: pool, outbox, - tables, + table_config, encryption_provider, phantom_aggregate: PhantomData, }) @@ -77,12 +63,29 @@ where Ok(PostgresTransaction { inner: self.inner.begin().await?, outbox: &self.outbox, - tables: &self.tables, + table_config: &self.table_config, encryption_provider: &self.encryption_provider, phantom_aggregate: PhantomData, }) } + /// Create a transaction from an existing raw sqlx transaction. + /// + /// This is useful for multi-aggregate scenarios where you want to use + /// the same database transaction across multiple repository types. + pub fn transaction_from<'a>( + &'a self, + transaction: sqlx::Transaction<'a, Postgres>, + ) -> PostgresTransaction<'a, T, O, E> { + PostgresTransaction { + inner: transaction, + outbox: &self.outbox, + table_config: &self.table_config, + encryption_provider: &self.encryption_provider, + phantom_aggregate: PhantomData, + } + } + /// Run database migrations to set up the required tables and schema. /// /// This method should be called once during application startup to ensure @@ -105,7 +108,7 @@ where O: SideEffectStorage + Clone + Send + Sync, E: EncryptionProvider + Clone + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -120,17 +123,12 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query, - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; + let query = &self.table_config.stream_events_query; Box::pin(reader_impl::stream_from::<_, T, E>( &self.inner, id, version, - query, + query.clone(), &self.encryption_provider, )) } @@ -141,10 +139,7 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_event_query; reader_impl::get_event::<_, T, E>( &self.inner, aggregate_id, @@ -160,10 +155,7 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_snapshot_query; reader_impl::get_snapshot::<_, T, E>(&self.inner, id, query, &self.encryption_provider) .await } @@ -182,7 +174,7 @@ where type Error = RepositoryError< T::ApplyError, <::DomainEvent as DomainEvent>::EventId, - DbError, + EventSourcingDbError, >; /// Loads an aggregate from the repository by its ID. diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs index d6c37de..2452bf3 100644 --- a/eventastic_postgres/src/side_effect.rs +++ b/eventastic_postgres/src/side_effect.rs @@ -1,4 +1,4 @@ -use crate::DbError; +use crate::SideEffectDbError; use crate::pickle::Pickle; use async_trait::async_trait; use eventastic::aggregate::SideEffect; @@ -11,7 +11,7 @@ use sqlx::{Postgres, Transaction}; /// different implementations such as direct table storage or outbox patterns. /// Implementors define how side effects are persisted within a database transaction. #[async_trait] -pub trait SideEffectStorage: Send + Sync +pub trait SideEffectStorage: Send + Sync where T: SideEffect + Pickle + Send + Sync, { @@ -27,10 +27,10 @@ where /// /// # Errors /// - /// Returns [`DbError`] if the storage operation fails. + /// Returns [`SideEffectDbError`] if the storage operation fails. async fn store_side_effects( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, - ) -> Result<(), DbError>; + ) -> Result<(), SideEffectDbError::Error>>; } diff --git a/eventastic_postgres/src/table_config.rs b/eventastic_postgres/src/table_config.rs new file mode 100644 index 0000000..999fe4b --- /dev/null +++ b/eventastic_postgres/src/table_config.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +/// Configuration for database tables used by an aggregate type. +/// +/// This struct contains pre-computed SQL queries to avoid string allocation +/// during query execution. +#[derive(Debug, Clone)] +pub struct TableConfig { + pub(crate) stream_events_query: Arc, + pub(crate) get_event_query: String, + pub(crate) get_snapshot_query: String, + pub(crate) insert_events_query: String, + pub(crate) upsert_snapshot_query: String, +} + +impl TableConfig { + /// Create a new TableConfig with pre-computed queries. + pub fn new(events: impl Into, snapshots: impl Into) -> Self { + let events = events.into(); + let snapshots = snapshots.into(); + + Self { + stream_events_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", + &events + ).into(), + get_event_query: format!( + "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", + &events + ), + get_snapshot_query: format!( + "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", + &snapshots + ), + insert_events_query: format!( + "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ + SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ + ON CONFLICT DO NOTHING returning event_id", + &events + ), + upsert_snapshot_query: format!( + "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", + &snapshots + ), + } + } +} diff --git a/eventastic_postgres/src/table_registry.rs b/eventastic_postgres/src/table_registry.rs deleted file mode 100644 index 2a3eceb..0000000 --- a/eventastic_postgres/src/table_registry.rs +++ /dev/null @@ -1,152 +0,0 @@ -use eventastic::aggregate::Aggregate; -use std::any::TypeId; -use std::collections::HashMap; -use std::sync::Arc; - -/// Configuration for database tables used by an aggregate type. -/// -/// This struct contains pre-computed SQL queries to avoid string allocation -/// during query execution. -#[derive(Debug, Clone)] -pub struct TableConfig { - pub(crate) stream_events_query: Arc, - pub(crate) get_event_query: String, - pub(crate) get_snapshot_query: String, - pub(crate) insert_events_query: String, - pub(crate) upsert_snapshot_query: String, -} - -impl TableConfig { - /// Create a new TableConfig with pre-computed queries. - pub fn new(events: impl Into, snapshots: impl Into) -> Self { - let events = events.into(); - let snapshots = snapshots.into(); - - Self { - stream_events_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND version >= $2 ORDER BY version ASC", - &events - ).into(), - get_event_query: format!( - "SELECT event, event_id, version FROM {} WHERE aggregate_id = $1 AND event_id = $2", - &events - ), - get_snapshot_query: format!( - "SELECT aggregate, version, snapshot_version FROM {} WHERE aggregate_id = $1 AND snapshot_version = $2", - &snapshots - ), - insert_events_query: format!( - "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ - SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ - ON CONFLICT DO NOTHING returning event_id", - &events - ), - upsert_snapshot_query: format!( - "INSERT INTO {} (aggregate_id, aggregate, version, snapshot_version, created_at) \ - VALUES ($1, $2, $3, $4, $5) \ - ON CONFLICT (aggregate_id, snapshot_version) DO UPDATE SET aggregate = $2, version = $3, created_at = $5", - &snapshots - ), - } - } -} - -/// Registry that maps aggregate types to their table configurations. -/// -/// This allows different aggregate types to use different tables while -/// supporting runtime configuration. -#[derive(Debug, Clone, Default)] -pub struct TableRegistry { - tables: HashMap>, -} - -impl TableRegistry { - /// Create a new empty table registry. - pub fn new() -> Self { - Self { - tables: HashMap::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(&mut self, config: TableConfig) { - self.tables.insert(TypeId::of::(), Arc::new(config)); - } - - /// Get the stream events query for an aggregate type. - pub fn stream_events_query(&self) -> Option> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.stream_events_query.clone()) - } - - /// Get the get event query for an aggregate type. - pub fn get_event_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_event_query.as_str()) - } - - /// Get the get snapshot query for an aggregate type. - pub fn get_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.get_snapshot_query.as_str()) - } - - /// Get the insert events query for an aggregate type. - pub fn insert_events_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.insert_events_query.as_str()) - } - - /// Get the upsert snapshot query for an aggregate type. - pub fn upsert_snapshot_query(&self) -> Option<&str> { - self.tables - .get(&TypeId::of::()) - .map(|config| config.upsert_snapshot_query.as_str()) - } -} - -/// Builder for creating a TableRegistry with a fluent API. -pub struct TableRegistryBuilder { - registry: TableRegistry, -} - -impl TableRegistryBuilder { - /// Create a new builder. - pub fn new() -> Self { - Self { - registry: TableRegistry::new(), - } - } - - /// Register table configuration for an aggregate type. - pub fn register(mut self, config: TableConfig) -> Self { - self.registry.register::(config); - self - } - - /// Register table configuration for an aggregate type with explicit table names. - pub fn register_with_tables( - mut self, - events: impl Into, - snapshots: impl Into, - ) -> Self { - self.registry - .register::(TableConfig::new(events, snapshots)); - self - } - - /// Build the TableRegistry. - pub fn build(self) -> TableRegistry { - self.registry - } -} - -impl Default for TableRegistryBuilder { - fn default() -> Self { - Self::new() - } -} diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index 8fea71c..b76392e 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,7 +1,7 @@ use crate::common::utils; use crate::pickle::Pickle; -use crate::{DbError, EncryptionProvider, SideEffectStorage, TableRegistry, reader_impl}; -use anyhow::Context as _; +use crate::table_config::TableConfig; +use crate::{DbError, EncryptionProvider, EventSourcingDbError, SideEffectStorage, reader_impl}; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -12,7 +12,6 @@ use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use eventastic::repository::{RepositoryError, RepositoryReader, RepositoryWriter}; -use futures::StreamExt; use sqlx::Row; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -24,25 +23,23 @@ use sqlx::{Postgres, Transaction}; pub struct PostgresTransaction<'a, T, O, E> { pub(crate) inner: Transaction<'a, Postgres>, pub(crate) outbox: &'a O, - pub(crate) tables: &'a TableRegistry, + pub(crate) table_config: &'a TableConfig, pub(crate) encryption_provider: &'a E, pub(crate) phantom_aggregate: std::marker::PhantomData, } impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> where - O: SideEffectStorage, - E: EncryptionProvider + Send + Sync + 'static, - T: Aggregate + 'static + Send + Sync + Pickle, - T::DomainEvent: DomainEvent + Pickle + Send + Sync, - T::SideEffect: SideEffect + Pickle + Send + Sync, - T::ApplyError: Send + Sync, + T: eventastic::aggregate::Aggregate + Pickle, + T::DomainEvent: Pickle, + T::SideEffect: Pickle, + E: EncryptionProvider, { /// Commit the transaction to the database. /// /// This finalizes all operations performed within this transaction, /// making them permanently visible to other database connections. - pub async fn commit(self) -> Result<(), DbError> { + pub async fn commit(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.commit().await?) } @@ -50,7 +47,7 @@ where /// /// This undoes all operations performed within this transaction, /// returning the database to its state before the transaction began. - pub async fn rollback(self) -> Result<(), DbError> { + pub async fn rollback(self) -> Result<(), EventSourcingDbError> { Ok(self.inner.rollback().await?) } @@ -64,25 +61,36 @@ where &mut self.inner } - /// Get an aggregate by ID using the table registry. + /// Get the encryption provider reference + pub fn encryption_provider(&self) -> &E { + self.encryption_provider + } +} + +impl<'a, T, O, E> PostgresTransaction<'a, T, O, E> +where + O: SideEffectStorage, + E: EncryptionProvider + Send + Sync + 'static, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::ApplyError: Send + Sync, +{ + /// Get an aggregate by ID. pub async fn get( &mut self, id: &Uuid, - ) -> Result, RepositoryError>> { + ) -> Result, RepositoryError>> { Context::load(self, id).await } - /// Store an aggregate using the table registry. + /// Store an aggregate. pub async fn store( &mut self, aggregate: &mut Context, - ) -> Result<(), SaveError>> { + ) -> Result<(), SaveError>> { aggregate.save(self).await } - - pub fn encryption_provider(&self) -> &E { - self.encryption_provider - } } #[async_trait] @@ -95,7 +103,7 @@ where O: SideEffectStorage, E: EncryptionProvider + Send + Sync, { - type DbError = DbError; + type DbError = EventSourcingDbError; /// Returns a stream of domain events. fn stream_from( @@ -110,17 +118,12 @@ where Self::DbError, >, > { - let query = match self.tables.stream_events_query::() { - Some(query) => query, - None => { - return futures::stream::iter(vec![Err(DbError::UnregisteredAggregate)]).boxed(); - } - }; + let query = &self.table_config.stream_events_query; Box::pin(reader_impl::stream_from::<_, T, E>( &mut *self.inner, id, version, - query, + query.clone(), self.encryption_provider, )) } @@ -131,10 +134,7 @@ where aggregate_id: &T::AggregateId, event_id: &<::DomainEvent as DomainEvent>::EventId, ) -> Result::DomainEvent>>, Self::DbError> { - let query = self - .tables - .get_event_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_event_query; reader_impl::get_event::<_, T, E>( &mut *self.inner, aggregate_id, @@ -150,10 +150,7 @@ where &mut self, id: &T::AggregateId, ) -> Result>, Self::DbError> { - let query = self - .tables - .get_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let query = &self.table_config.get_snapshot_query; reader_impl::get_snapshot::<_, T, E>(&mut *self.inner, id, query, self.encryption_provider) .await } @@ -190,11 +187,7 @@ where let version = utils::version_to_i64(version)?; - let serialised_event = event - .event - .pickle() - .context("Failed to pickle event") - .map_err(DbError::PicklingError)?; + let serialised_event = event.event.pickle().map_err(DbError::EventPicklingError)?; event_ids_to_insert.push(event_id); versions_to_insert.push(version); @@ -209,15 +202,12 @@ where .await .map_err(DbError::Encryption)?; if cipher.len() != number_of_items { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } events_to_insert.append(&mut cipher); } - let insert_query = self - .tables - .insert_events_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let insert_query = &self.table_config.insert_events_query; let inserted_ids: Result, sqlx::Error> = sqlx::query(insert_query) .bind(&event_ids_to_insert[..]) @@ -240,8 +230,7 @@ where let aggregate = snapshot .aggregate .pickle() - .context("Failed to pickle aggregate") - .map_err(DbError::PicklingError)?; + .map_err(DbError::SnapshotPicklingError)?; let mut cipher = self .encryption_provider .encrypt(vec![aggregate]) @@ -249,16 +238,13 @@ where .map_err(DbError::Encryption)? .into_iter(); let Some(aggregate) = cipher.next() else { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); }; if cipher.next().is_some() { - return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems); + return Err(DbError::EncryptionProviderReturnedWrongNumberOfItems); } - let upsert_query = self - .tables - .upsert_snapshot_query::() - .ok_or(DbError::UnregisteredAggregate)?; + let upsert_query = &self.table_config.upsert_snapshot_query; sqlx::query(upsert_query) .bind(aggregated_id) @@ -281,5 +267,6 @@ where self.outbox .store_side_effects(&mut self.inner, outbox_item) .await + .map_err(|e| e.into()) } } diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index fcddfad..f2146cb 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_outbox_postgres::TableOutbox; use eventastic_postgres::{ - EncryptionProvider, NoEncryption, Pickle, PostgresRepository, TableRegistryBuilder, + EncryptionProvider, NoEncryption, Pickle, PostgresRepository, TableConfig, }; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -20,15 +20,11 @@ pub async fn get_repository() -> PostgresRepository("events", "snapshots") - .build(); - let repo = PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(NoEncryption), - tables, NoEncryption, ) .await @@ -48,15 +44,11 @@ pub async fn get_encrypted_repository() let pool_options = PoolOptions::default(); - let tables = TableRegistryBuilder::new() - .register_with_tables::("events", "snapshots") - .build(); - let repo = PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(TestEncryptionProvider), - tables, TestEncryptionProvider, ) .await @@ -67,7 +59,7 @@ pub async fn get_encrypted_repository() repo } -#[derive(serde::Deserialize, Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone)] pub struct SavedSnapshot { pub version: i64, pub aggregate: Account, diff --git a/eventastic_postgres/tests/common/mod.rs b/eventastic_postgres/tests/common/mod.rs index c590871..7bf30b2 100644 --- a/eventastic_postgres/tests/common/mod.rs +++ b/eventastic_postgres/tests/common/mod.rs @@ -3,3 +3,4 @@ pub mod encryption; pub mod helpers; pub mod test_aggregate; +pub mod test_order_aggregate; diff --git a/eventastic_postgres/tests/common/test_order_aggregate.rs b/eventastic_postgres/tests/common/test_order_aggregate.rs new file mode 100644 index 0000000..cf6b6d1 --- /dev/null +++ b/eventastic_postgres/tests/common/test_order_aggregate.rs @@ -0,0 +1,185 @@ +use eventastic::aggregate::Aggregate; +use eventastic::aggregate::SideEffect; +use eventastic::event::DomainEvent; +use serde::Deserialize; +use serde::Serialize; +use thiserror::Error; +use uuid::Uuid; + +// Define our Order aggregate - different from Account +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Order { + pub order_id: Uuid, + pub customer_id: Uuid, + pub total_amount: i64, + pub status: OrderStatus, +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum OrderStatus { + Pending, + Confirmed, + Shipped, + Delivered, + Cancelled, +} + +// Define our domain events for Order +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderEvent { + Created { + order_id: Uuid, + event_id: Uuid, + customer_id: Uuid, + total_amount: i64, + }, + Confirmed { + event_id: Uuid, + }, + Shipped { + event_id: Uuid, + tracking_number: String, + }, + Delivered { + event_id: Uuid, + }, + Cancelled { + event_id: Uuid, + reason: String, + }, +} + +impl DomainEvent for OrderEvent { + type EventId = Uuid; + fn id(&self) -> &Uuid { + match self { + OrderEvent::Created { event_id, .. } + | OrderEvent::Confirmed { event_id, .. } + | OrderEvent::Shipped { event_id, .. } + | OrderEvent::Delivered { event_id, .. } + | OrderEvent::Cancelled { event_id, .. } => event_id, + } + } +} + +// Define our domain error for Order +#[derive(Error, Debug)] +pub enum OrderDomainError { + #[error("This event can't be applied given the current state of the order")] + InvalidState, + #[error("Order is already in final state")] + AlreadyFinalized, +} + +// Define our side effects for Order - different from Account side effects +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] +pub enum OrderSideEffects { + SendConfirmationEmail { + id: Uuid, + customer_email: String, + order_id: Uuid, + }, + NotifyWarehouse { + id: Uuid, + order_id: Uuid, + items: Vec, + }, + UpdateInventory { + id: Uuid, + product_ids: Vec, + quantities: Vec, + }, +} + +impl SideEffect for OrderSideEffects { + type SideEffectId = Uuid; + + fn id(&self) -> &Self::SideEffectId { + match self { + OrderSideEffects::SendConfirmationEmail { id, .. } + | OrderSideEffects::NotifyWarehouse { id, .. } + | OrderSideEffects::UpdateInventory { id, .. } => id, + } + } +} + +// Implement the aggregate trait for our Order struct +impl Aggregate for Order { + const SNAPSHOT_VERSION: u64 = 1; + + type AggregateId = Uuid; + type DomainEvent = OrderEvent; + type ApplyError = OrderDomainError; + type SideEffect = OrderSideEffects; + + fn aggregate_id(&self) -> &Self::AggregateId { + &self.order_id + } + + fn apply(&mut self, event: &Self::DomainEvent) -> Result<(), Self::ApplyError> { + match event { + OrderEvent::Confirmed { .. } => { + if self.status != OrderStatus::Pending { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Confirmed; + } + OrderEvent::Shipped { .. } => { + if self.status != OrderStatus::Confirmed { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Shipped; + } + OrderEvent::Delivered { .. } => { + if self.status != OrderStatus::Shipped { + return Err(Self::ApplyError::InvalidState); + } + self.status = OrderStatus::Delivered; + } + OrderEvent::Cancelled { .. } => { + if matches!(self.status, OrderStatus::Delivered | OrderStatus::Cancelled) { + return Err(Self::ApplyError::AlreadyFinalized); + } + self.status = OrderStatus::Cancelled; + } + OrderEvent::Created { .. } => return Err(Self::ApplyError::InvalidState), + } + Ok(()) + } + + fn apply_new(event: &Self::DomainEvent) -> Result { + match event { + OrderEvent::Created { + order_id, + customer_id, + total_amount, + .. + } => Ok(Self { + order_id: *order_id, + customer_id: *customer_id, + total_amount: *total_amount, + status: OrderStatus::Pending, + }), + _ => Err(Self::ApplyError::InvalidState), + } + } + + fn side_effects(&self, event: &Self::DomainEvent) -> Option> { + let side_effect = match event { + OrderEvent::Created { event_id, .. } => Some(OrderSideEffects::SendConfirmationEmail { + id: *event_id, + customer_email: "customer@example.com".to_string(), + order_id: self.order_id, + }), + OrderEvent::Confirmed { event_id } => Some(OrderSideEffects::NotifyWarehouse { + id: *event_id, + order_id: self.order_id, + items: vec!["item1".to_string(), "item2".to_string()], + }), + OrderEvent::Shipped { .. } + | OrderEvent::Delivered { .. } + | OrderEvent::Cancelled { .. } => None, + }; + side_effect.map(|s| vec![s]) + } +} diff --git a/eventastic_postgres/tests/encryption.rs b/eventastic_postgres/tests/encryption.rs index bd16d46..7e2a9c5 100644 --- a/eventastic_postgres/tests/encryption.rs +++ b/eventastic_postgres/tests/encryption.rs @@ -36,9 +36,7 @@ async fn when_encryption_is_enabled_aggregate_can_be_saved_and_loaded() { .expect("Failed to commit transaction"); // Assert - let loaded_account = load_encrypted_account(account_id) - .await - .expect("Failed to load account"); + let loaded_account = load_encrypted_account(account_id).await; let loaded_account = loaded_account.state(); assert_eq!(created_account, loaded_account); @@ -130,7 +128,7 @@ async fn when_encryption_is_enabled_events_cannot_be_loaded_by_id_without_encryp Account, >>::get_event(&mut repository, &account_id, &event_id) .await; - assert!(matches!(result, Err(DbError::PicklingError(_)))); + assert!(matches!(result, Err(DbError::EventPicklingError(_)))); } #[tokio::test] @@ -165,7 +163,7 @@ async fn when_encryption_is_enabled_events_can_be_saved_and_loaded() { &mut repository, &account_id, 0 ); while let Some(event) = events.next().await { - assert!(matches!(event, Ok(_))); + assert!(event.is_ok()); } } @@ -200,7 +198,7 @@ async fn when_encryption_is_enabled_events_cannot_be_loaded_without_encryption() while let Some(event) = events.next().await { assert!(matches!( event, - Err(eventastic_postgres::DbError::PicklingError(_)) + Err(eventastic_postgres::DbError::EventPicklingError(_)) )); } } @@ -230,10 +228,11 @@ async fn when_encryption_is_enabled_aggregate_cannot_be_loaded_without_encryptio // Assert let repository = get_repository().await; let mut transaction = repository.begin_transaction().await.unwrap(); + assert!(matches!( transaction.get(&account_id).await, Err(eventastic::repository::RepositoryError::Repository( - eventastic_postgres::DbError::PicklingError(_) + eventastic_postgres::DbError::SnapshotPicklingError(_) )), )); } @@ -290,7 +289,7 @@ async fn when_encryption_is_enabled_side_effect_can_be_saved_and_loaded() { } } -async fn load_encrypted_account(account_id: Uuid) -> anyhow::Result> { +async fn load_encrypted_account(account_id: Uuid) -> Context { let repository = get_encrypted_repository().await; let mut transaction = repository @@ -298,7 +297,8 @@ async fn load_encrypted_account(account_id: Uuid) -> anyhow::Result = transaction.get(&account_id).await?; - - Ok(context) + transaction + .get(&account_id) + .await + .expect("Failed to encrypted load account") } diff --git a/eventastic_postgres/tests/multi_aggregate.rs b/eventastic_postgres/tests/multi_aggregate.rs new file mode 100644 index 0000000..b26f206 --- /dev/null +++ b/eventastic_postgres/tests/multi_aggregate.rs @@ -0,0 +1,300 @@ +mod common; + +use common::helpers::get_repository; +use common::test_aggregate::{Account, AccountEvent}; +use common::test_order_aggregate::{Order, OrderEvent, OrderStatus}; +use eventastic::aggregate::Root; +use eventastic_outbox_postgres::TableOutbox; +use eventastic_postgres::PostgresRepository; +use eventastic_postgres::{NoEncryption, TableConfig}; +use sqlx::pool::PoolOptions; +use sqlx::postgres::PgConnectOptions; +use std::str::FromStr; +use uuid::Uuid; + +// Helper function to get an order repository using the same pool +async fn get_order_repository() -> PostgresRepository, NoEncryption> +{ + let host = std::env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".to_string()); + let connection_string = format!("postgres://postgres:password@{host}/postgres"); + let connection_options = + PgConnectOptions::from_str(&connection_string).expect("Failed to parse connection options"); + + let pool_options = PoolOptions::default(); + + PostgresRepository::new( + connection_options, + pool_options, + TableConfig::new("events", "snapshots"), + TableOutbox::new(NoEncryption), + NoEncryption, + ) + .await + .expect("Failed to connect to postgres") +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_commit_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().account_id, account_id); + assert_eq!(loaded_account.state().balance, 1000); + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().order_id, order_id); + assert_eq!(loaded_order.state().total_amount, 500); + assert_eq!(loaded_order.state().status, OrderStatus::Pending); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_rollback_test() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create and store account + let account_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_event).expect("Failed to create account"); + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and store order + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Rollback the transaction instead of committing + order_tx + .rollback() + .await + .expect("Failed to rollback transaction"); + + // Assert - verify neither aggregate was saved + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let account_result = account_load_tx.get(&account_id).await; + assert!( + account_result.is_err(), + "Account should not exist after rollback" + ); + account_load_tx + .rollback() + .await + .expect("Failed to rollback load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let order_result = order_load_tx.get(&order_id).await; + assert!( + order_result.is_err(), + "Order should not exist after rollback" + ); + order_load_tx + .rollback() + .await + .expect("Failed to rollback order load transaction"); +} + +#[tokio::test] +pub async fn multi_aggregate_transaction_with_mixed_side_effects() { + // Arrange + let account_repo = get_repository().await; + let order_repo = get_order_repository().await; + + let account_id = Uuid::new_v4(); + let order_id = Uuid::new_v4(); + let customer_id = Uuid::new_v4(); + + // Start with account repository transaction + let mut account_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin transaction"); + + // Create account and add money (generates side effects) + let account_open_event = AccountEvent::Open { + event_id: Uuid::new_v4(), + account_id, + email: "test@example.com".to_string(), + starting_balance: 1000, + }; + let mut account = Account::record_new(account_open_event).expect("Failed to create account"); + + let add_event = AccountEvent::Add { + event_id: Uuid::new_v4(), + amount: 500, + }; + account + .record_that(add_event) + .expect("Failed to add money to account"); + + account_tx + .store(&mut account) + .await + .expect("Failed to store account"); + + // Get the raw transaction and pass it to order repository + let raw_tx = account_tx.into_inner(); + let mut order_tx = order_repo.transaction_from(raw_tx); + + // Create and confirm order (generates different side effects) + let order_event = OrderEvent::Created { + event_id: Uuid::new_v4(), + order_id, + customer_id, + total_amount: 500, + }; + let mut order = Order::record_new(order_event).expect("Failed to create order"); + + let confirm_event = OrderEvent::Confirmed { + event_id: Uuid::new_v4(), + }; + order + .record_that(confirm_event) + .expect("Failed to confirm order"); + + order_tx + .store(&mut order) + .await + .expect("Failed to store order"); + + // Commit the transaction + order_tx + .commit() + .await + .expect("Failed to commit transaction"); + + // Assert - verify both aggregates were saved with correct states + let mut account_load_tx = account_repo + .begin_transaction() + .await + .expect("Failed to begin load transaction"); + let loaded_account = account_load_tx + .get(&account_id) + .await + .expect("Failed to load account"); + assert_eq!(loaded_account.state().balance, 1500); // 1000 + 500 + account_load_tx + .commit() + .await + .expect("Failed to commit load transaction"); + + let mut order_load_tx = order_repo + .begin_transaction() + .await + .expect("Failed to begin order load transaction"); + let loaded_order = order_load_tx + .get(&order_id) + .await + .expect("Failed to load order"); + assert_eq!(loaded_order.state().status, OrderStatus::Confirmed); + order_load_tx + .commit() + .await + .expect("Failed to commit order load transaction"); +} diff --git a/examples/bank/Cargo.toml b/examples/bank/Cargo.toml index e64ff49..8a2f0bc 100644 --- a/examples/bank/Cargo.toml +++ b/examples/bank/Cargo.toml @@ -7,12 +7,11 @@ edition = "2024" [dependencies] eventastic = { path = "../../eventastic" } -eventastic_postgres = { path = "../../eventastic_postgres" } +eventastic_postgres = { path = "../../eventastic_postgres", features = ["serde"] } eventastic_outbox_postgres = { path = "../../eventastic_outbox_postgres" } thiserror = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } sqlx = { workspace = true } -anyhow = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 0c6b3f3..b1b7d31 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -9,7 +9,7 @@ use eventastic::aggregate::SideEffect; use eventastic::event::DomainEvent; use eventastic::repository::Repository; use eventastic_outbox_postgres::{RepositoryOutboxExt, SideEffectHandler, TableOutbox}; -use eventastic_postgres::{NoEncryption, PostgresRepository, RootExt}; +use eventastic_postgres::{NoEncryption, PostgresRepository, RootExt, TableConfig}; use serde::Deserialize; use serde::Serialize; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; @@ -17,7 +17,7 @@ use thiserror::Error; use uuid::Uuid; #[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> Result<(), Box> { // Setup postgres repo let repository = get_repository().await; @@ -392,15 +392,11 @@ async fn get_repository() -> PostgresRepository("events", "snapshots") - .build(); - PostgresRepository::new( connection_options, pool_options, + TableConfig::new("events", "snapshots"), TableOutbox::new(NoEncryption), - tables, NoEncryption, ) .await