Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ chrono = "0.4"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
futures-util = "0.3"
anyhow = "1"
4 changes: 2 additions & 2 deletions eventastic/src/aggregate/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ where

/// This error is returned when the Repository fails to insert the event
/// because the version already exists, indicating a concurrent modification.
#[error("Optimistic Concurrency Error Version {1} of aggregate {0:?} already exists")]
#[error("Optimistic Concurrency Error")]
OptimisticConcurrency(T::AggregateId, u64),
}

Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
assert_eq!(context.version(), 0);

for i in 1..=5 {
let add_event = create_add_event(&format!("add-{}", i), i);
let add_event = create_add_event(&format!("add-{i}"), i);
context.record_that(add_event).unwrap();
assert_eq!(context.version(), i as u64);
}
Expand Down
2 changes: 1 addition & 1 deletion eventastic/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@
) -> Option<EventStoreEvent<T::DomainEvent>> {
let storage = self.storage.lock().unwrap();

if let Some((stored_aggregate_id, version)) = storage.events_by_id.get(event_id) {

Check warning on line 206 in eventastic/src/memory.rs

View workflow job for this annotation

GitHub Actions / Clippy Lint

this `if` statement can be collapsed
if stored_aggregate_id == aggregate_id {

Check warning on line 207 in eventastic/src/memory.rs

View workflow job for this annotation

GitHub Actions / Clippy Lint

this `if` statement can be collapsed
if let Some(aggregate_events) = storage.events.get(aggregate_id) {
return aggregate_events.get(version).cloned();
}
Expand Down Expand Up @@ -407,7 +407,7 @@
}

// Check for version conflicts (optimistic concurrency)
if let Some(aggregate_events) = storage.events.get(id) {

Check warning on line 410 in eventastic/src/memory.rs

View workflow job for this annotation

GitHub Actions / Clippy Lint

this `if` statement can be collapsed
if aggregate_events.contains_key(&event.version) {
return Err(InMemoryError::VersionConflict {
expected: event.version,
Expand Down Expand Up @@ -870,7 +870,7 @@

// Verify side effects were stored
// Reset: 2 side effects, Add: 1, Subtract: 1, Multiply: 0, Add: 1 = 5 total
let expected_side_effects = 2 + 1 + 1 + 0 + 1;
let expected_side_effects = 5;
assert_eq!(repository.side_effects_count(), expected_side_effects);

let side_effects = repository.get_all_side_effects();
Expand Down
14 changes: 6 additions & 8 deletions eventastic/src/test_fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
//! that can be used across different test modules to ensure consistency
//! and reduce code duplication.

#![cfg(test)]

use crate::{
aggregate::{Aggregate, SideEffect},
event::DomainEvent,
Expand Down Expand Up @@ -139,21 +137,21 @@ impl Aggregate for TestCounter {
match event {
TestEvent::Reset { event_id, .. } => Some(vec![
TestSideEffect::LogOperation {
id: format!("{}-log", event_id),
id: format!("{event_id}-log"),
operation: "Reset".to_string(),
},
TestSideEffect::NotifyUser {
id: format!("{}-notify", event_id),
id: format!("{event_id}-notify"),
message: "Counter has been reset".to_string(),
},
]),
TestEvent::Add { event_id, value } => Some(vec![TestSideEffect::LogOperation {
id: format!("{}-log", event_id),
operation: format!("Add {}", value),
id: format!("{event_id}-log"),
operation: format!("Add {value}"),
}]),
TestEvent::Subtract { event_id, value } => Some(vec![TestSideEffect::LogOperation {
id: format!("{}-log", event_id),
operation: format!("Subtract {}", value),
id: format!("{event_id}-log"),
operation: format!("Subtract {value}"),
}]),
TestEvent::Multiply { .. } => None, // No side effects for multiply
}
Expand Down
2 changes: 1 addition & 1 deletion eventastic_outbox_postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.5.0"
edition = "2024"

[dependencies]
anyhow = { workspace = true }
eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" }
async-trait = { workspace = true }
sqlx = { workspace = true }
Expand All @@ -13,3 +12,4 @@ uuid = { workspace = true }
eventastic = { path = "../eventastic", version = "0.5" }
futures-util = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
104 changes: 77 additions & 27 deletions eventastic_outbox_postgres/src/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,54 @@
use anyhow::Context;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use eventastic::aggregate::{Aggregate, SideEffect};
use eventastic::event::DomainEvent;
use eventastic_postgres::{
DbError, EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage,
EncryptionProvider, Pickle, PostgresRepository, PostgresTransaction, SideEffectDbError,
SideEffectStorage,
};
use sqlx::{Postgres, Transaction};
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;

use crate::OutboxMessage;

/// Errors that can occur during outbox operations.
#[derive(Error, Debug)]
pub enum OutboxError<EncryptionError, SideEffectPicklingError> {
/// A database operation failed.
#[error("Database error: {0}")]
Database(sqlx::Error),
/// Failed to encrypt or decrypt side effect data.
#[error("Encryption error: {0}")]
Encryption(EncryptionError),
/// Failed to pickle or unpickle side effect data.
#[error("Side effect pickling error: {0}")]
SideEffectPickling(SideEffectPicklingError),
/// Encryption provider returned wrong number of items.
#[error("Encryption provider returned wrong number of items")]
EncryptionProviderReturnedWrongNumberOfItems,
}

impl<EP, SE> From<sqlx::Error> for OutboxError<EP, SE> {
fn from(e: sqlx::Error) -> Self {
OutboxError::Database(e)
}
}

impl<EP, SE> From<OutboxError<EP, SE>> for SideEffectDbError<EP, SE> {
fn from(e: OutboxError<EP, SE>) -> Self {
match e {
OutboxError::Database(err) => SideEffectDbError::DbError(err),
OutboxError::Encryption(err) => SideEffectDbError::Encryption(err),
OutboxError::SideEffectPickling(err) => SideEffectDbError::SideEffectPicklingError(err),
OutboxError::EncryptionProviderReturnedWrongNumberOfItems => {
SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems
}
}
}
}

/// Default implementation of [`SideEffectStorage`] that stores messages in an `outbox` table.
#[derive(Clone, Copy, Default)]
pub struct TableOutbox<E> {
Expand All @@ -36,7 +73,7 @@
&self,
transaction: &mut Transaction<'_, Postgres>,
items: Vec<S>,
) -> Result<(), DbError<E::Error>> {
) -> Result<(), SideEffectDbError<E::Error, <S as Pickle>::Error>> {
let mut ids: Vec<Uuid> = Vec::with_capacity(items.len());
let mut messages: Vec<Vec<u8>> = Vec::with_capacity(items.len());
let mut retries: Vec<i32> = Vec::with_capacity(items.len());
Expand All @@ -49,8 +86,7 @@
let id = *side_effect.id();
let msg = side_effect
.pickle()
.context("Failed to pickle side effect")
.map_err(DbError::PicklingError)?;
.map_err(SideEffectDbError::SideEffectPicklingError)?;
ids.push(id);
plain.push(msg);
retries.push(0);
Expand All @@ -62,9 +98,9 @@
.encryption_provider
.encrypt(plain)
.await
.map_err(DbError::Encryption)?;
.map_err(SideEffectDbError::Encryption)?;
if number_of_items != cipher.len() {
return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems);
return Err(SideEffectDbError::EncryptionProviderReturnedWrongNumberOfItems);
}
messages.append(&mut cipher);
}
Expand All @@ -91,22 +127,30 @@
}

#[async_trait]
pub trait TransactionOutboxExt<T, E>
pub trait TransactionOutboxExt<T, E, SideEffectPicklingError>
where
T: SideEffect + Pickle + Send + 'static,
T::SideEffectId: Clone + Send + 'static,
for<'sql> T::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
{
async fn get_outbox_batch(&mut self) -> Result<Vec<OutboxMessage<T>>, DbError<E>>;
async fn get_outbox_batch(
&mut self,
) -> Result<Vec<OutboxMessage<T>>, OutboxError<E, SideEffectPicklingError>>;

async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError<E>>;
async fn delete_outbox_item(
&mut self,
id: T::SideEffectId,
) -> Result<(), OutboxError<E, SideEffectPicklingError>>;

async fn update_outbox_item(&mut self, item: OutboxMessage<T>) -> Result<(), DbError<E>>;
async fn update_outbox_item(
&mut self,
item: OutboxMessage<T>,
) -> Result<(), OutboxError<E, SideEffectPicklingError>>;
}

#[async_trait]
impl<T, E> TransactionOutboxExt<T::SideEffect, E::Error>
impl<T, E> TransactionOutboxExt<T::SideEffect, E::Error, <T::SideEffect as Pickle>::Error>
for PostgresTransaction<'_, T, TableOutbox<E>, E>
where
T: Aggregate<AggregateId = Uuid> + Send + Sync + Pickle + 'static,
Expand All @@ -119,7 +163,10 @@
{
async fn get_outbox_batch(
&mut self,
) -> Result<Vec<OutboxMessage<T::SideEffect>>, DbError<E::Error>> {
) -> Result<
Vec<OutboxMessage<T::SideEffect>>,
OutboxError<E::Error, <T::SideEffect as Pickle>::Error>,
> {
#[derive(sqlx::FromRow)]
struct OutboxRow {
message: Vec<u8>,
Expand All @@ -143,28 +190,27 @@
.encryption_provider()
.decrypt(cipher)
.await
.map_err(DbError::Encryption)?;
.map_err(OutboxError::Encryption)?;
if plain.len() != number_of_items {
return Err(DbError::EncrypytionProviderReturnedWrongNumberOfItems);
return Err(OutboxError::EncryptionProviderReturnedWrongNumberOfItems);
}
messages.append(&mut plain);
}

rows.into_iter()
.zip(messages.into_iter())

Check warning on line 201 in eventastic_outbox_postgres/src/outbox.rs

View workflow job for this annotation

GitHub Actions / Clippy Lint

explicit call to `.into_iter()` in function argument accepting `IntoIterator`
.map(|(row, message)| {
let msg =
T::SideEffect::unpickle(&message).context("Failed to unpickle side effect")?;
T::SideEffect::unpickle(&message).map_err(OutboxError::SideEffectPickling)?;
Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue))
})
.collect::<Result<Vec<_>, anyhow::Error>>()
.map_err(DbError::PicklingError)
.collect::<Result<Vec<_>, OutboxError<E::Error, <T::SideEffect as Pickle>::Error>>>()
}

async fn delete_outbox_item(
&mut self,
id: <T::SideEffect as SideEffect>::SideEffectId,
) -> Result<(), DbError<E::Error>> {
) -> Result<(), OutboxError<E::Error, <T::SideEffect as Pickle>::Error>> {
sqlx::query("DELETE FROM outbox WHERE id = $1")
.bind(id)
.execute(self.inner_mut().as_mut())
Expand All @@ -175,7 +221,7 @@
async fn update_outbox_item(
&mut self,
item: OutboxMessage<T::SideEffect>,
) -> Result<(), DbError<E::Error>> {
) -> Result<(), OutboxError<E::Error, <T::SideEffect as Pickle>::Error>> {
sqlx::query("UPDATE outbox SET retries = $2, requeue = $3 WHERE id = $1")
.bind(item.message.id())
.bind(i32::from(item.retries))
Expand Down Expand Up @@ -228,20 +274,21 @@
pub trait RepositoryOutboxExt<T, H, E>
where
T: Aggregate<AggregateId = Uuid> + Send + Sync + Pickle + 'static,
T::DomainEvent: Pickle + Send + Sync,
T::SideEffect: SideEffect + Pickle + Clone + Send + Sync + 'static,
<T::SideEffect as SideEffect>::SideEffectId: Clone + Send + 'static,
H: SideEffectHandler<SideEffect = T::SideEffect> + Send + Sync + 'static,
E: EncryptionProvider + Clone + Send + Sync + 'static,
for<'a> PostgresTransaction<'a, T, TableOutbox<E>, E>:
TransactionOutboxExt<T::SideEffect, E::Error>,
TransactionOutboxExt<T::SideEffect, E::Error, <T::SideEffect as Pickle>::Error>,
for<'sql> <T::SideEffect as SideEffect>::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
{
async fn start_outbox(
&self,
handler: H,
poll_interval: std::time::Duration,
) -> Result<(), DbError<E::Error>>;
) -> Result<(), OutboxError<E::Error, <T::SideEffect as Pickle>::Error>>;
}

#[async_trait]
Expand All @@ -255,15 +302,15 @@
H: SideEffectHandler<SideEffect = T::SideEffect> + Send + Sync + 'static,
E: EncryptionProvider + Clone + Send + Sync + 'static,
for<'a> PostgresTransaction<'a, T, TableOutbox<E>, E>:
TransactionOutboxExt<T::SideEffect, E::Error>,
TransactionOutboxExt<T::SideEffect, E::Error, <T::SideEffect as Pickle>::Error>,
for<'sql> <T::SideEffect as SideEffect>::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
{
async fn start_outbox(
&self,
handler: H,
poll_interval: std::time::Duration,
) -> Result<(), DbError<E::Error>> {
) -> Result<(), OutboxError<E::Error, <T::SideEffect as Pickle>::Error>> {
let handler = Arc::new(handler);
loop {
let deadline = std::time::Instant::now() + poll_interval;
Expand All @@ -276,7 +323,7 @@
async fn process_outbox_batch<T, H, E>(
repo: &PostgresRepository<T, TableOutbox<E>, E>,
handler: Arc<H>,
) -> Result<(), DbError<E::Error>>
) -> Result<(), OutboxError<E::Error, <T::SideEffect as Pickle>::Error>>
where
T: Aggregate<AggregateId = Uuid> + Send + Sync + Pickle + 'static,
T::SideEffect: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
Expand All @@ -285,7 +332,7 @@
T::ApplyError: Send + Sync,
E: EncryptionProvider + Clone + Send + Sync + 'static,
for<'a> PostgresTransaction<'a, T, TableOutbox<E>, E>:
TransactionOutboxExt<T::SideEffect, E::Error>,
TransactionOutboxExt<T::SideEffect, E::Error, <T::SideEffect as Pickle>::Error>,
for<'sql> <T::SideEffect as SideEffect>::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
{
Expand All @@ -308,5 +355,8 @@
}
}

tx.commit().await
tx.into_inner()
.commit()
.await
.map_err(OutboxError::Database)
}
9 changes: 6 additions & 3 deletions eventastic_postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ categories = ["web-programming", "asynchronous"]
keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
async-stream = { workspace = true }
chrono = { workspace = true }
eventastic = { path = "../eventastic", version = "0.5" }
futures = { workspace = true }
futures-util = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" }
uuid = { workspace = true }

[features]
default = []
serde = ["dep:serde", "dep:serde_json"]
Loading
Loading