diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index bff5b98..95b7d1a 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,13 +4,12 @@ version = "0.5.0" edition = "2024" [dependencies] +anyhow = { workspace = true } eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } -serde_json = { workspace = true } sqlx = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } -serde = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs index 7014984..b4e649f 100644 --- a/eventastic_outbox_postgres/src/outbox.rs +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -1,9 +1,10 @@ +use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, Utc}; use eventastic::aggregate::SideEffect; -use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; -use serde::Serialize; -use serde::de::DeserializeOwned; +use eventastic_postgres::{ + DbError, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage, +}; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; use std::sync::Arc; @@ -16,20 +17,23 @@ pub struct TableOutbox; #[async_trait] impl SideEffectStorage for TableOutbox { - async fn store_side_effects + Serialize + Send + Sync>( + async fn store_side_effects + Pickle + Send + Sync>( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, ) -> Result<(), DbError> { let mut ids: Vec = Vec::with_capacity(items.len()); - let mut messages: Vec = Vec::with_capacity(items.len()); + let mut messages: Vec> = Vec::with_capacity(items.len()); let mut retries: Vec = Vec::with_capacity(items.len()); let mut requeues: Vec = Vec::with_capacity(items.len()); let mut created_ats: Vec> = Vec::with_capacity(items.len()); for side_effect in items { let id = *side_effect.id(); - let msg = serde_json::to_value(side_effect).map_err(DbError::SerializationError)?; + let msg = side_effect + .pickle() + .context("Failed to pickle side effect") + .map_err(DbError::PicklingError)?; ids.push(id); messages.push(msg); retries.push(0); @@ -39,7 +43,7 @@ impl SideEffectStorage for TableOutbox { sqlx::query( "INSERT INTO outbox(id, message, retries, requeue, created_at) - SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) + SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::boolean[], $5::timestamptz[]) ON CONFLICT (id) DO UPDATE SET message = excluded.message, retries = excluded.retries, @@ -61,7 +65,7 @@ impl SideEffectStorage for TableOutbox { #[async_trait] pub trait TransactionOutboxExt where - T: SideEffect + DeserializeOwned + Send + 'static, + T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, @@ -76,7 +80,7 @@ where #[async_trait] impl TransactionOutboxExt for PostgresTransaction<'_, TableOutbox> where - T: SideEffect + DeserializeOwned + Send + 'static, + T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, @@ -84,7 +88,7 @@ where async fn get_outbox_batch(&mut self) -> Result>, DbError> { #[derive(sqlx::FromRow)] struct OutboxRow { - message: serde_json::Value, + message: Vec, retries: i32, requeue: bool, } @@ -99,11 +103,11 @@ where rows.into_iter() .map(|row| { - let msg = serde_json::from_value(row.message)?; + let msg = T::unpickle(&row.message).context("Failed to unpickle side effect")?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) }) - .collect::, serde_json::Error>>() - .map_err(DbError::SerializationError) + .collect::, anyhow::Error>>() + .map_err(DbError::PicklingError) } async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> { @@ -171,7 +175,7 @@ pub trait RepositoryOutboxExt { poll_interval: std::time::Duration, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> @@ -188,7 +192,7 @@ impl RepositoryOutboxExt for PostgresRepository { poll_interval: std::time::Duration, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> @@ -210,7 +214,7 @@ async fn process_outbox_batch( handler: Arc, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt, diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 8f9396f..e95cfd9 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -10,18 +10,18 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } chrono = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } -sqlx = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } -async-trait = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } serde = { workspace = true } -anyhow = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } [dev-dependencies] -uuid = { workspace = true } eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } +uuid = { workspace = true } diff --git a/eventastic_postgres/migrations/20230610185630_init.sql b/eventastic_postgres/migrations/20230610185630_init.sql index 106aa20..1c07ba2 100644 --- a/eventastic_postgres/migrations/20230610185630_init.sql +++ b/eventastic_postgres/migrations/20230610185630_init.sql @@ -2,7 +2,7 @@ CREATE TABLE if not exists events ( aggregate_id uuid NOT NULL, version bigint NOT NULL CHECK (version >= 0), event_id uuid NOT NULL, - event jsonb NOT NULL, + event bytea NOT NULL, created_at timestamptz NOT NULL, PRIMARY KEY (aggregate_id, version) ); @@ -11,7 +11,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS events_event_id ON events (event_id); CREATE TABLE if not exists snapshots ( aggregate_id uuid NOT NULL, - aggregate jsonb NOT NULL, + aggregate bytea NOT NULL, version bigint NOT NULL CHECK (version >= 0), snapshot_version bigint NOT NULL, created_at timestamptz NOT NULL, @@ -20,7 +20,7 @@ CREATE TABLE if not exists snapshots ( CREATE TABLE if not exists outbox ( id uuid PRIMARY KEY, - message jsonb NOT NULL, + message bytea NOT NULL, retries integer NOT NULL CHECK (retries >= 0), requeue boolean NOT NULL, created_at timestamptz NOT NULL diff --git a/eventastic_postgres/src/common.rs b/eventastic_postgres/src/common.rs index 146cecf..40726f5 100644 --- a/eventastic_postgres/src/common.rs +++ b/eventastic_postgres/src/common.rs @@ -4,11 +4,12 @@ //! duplication and ensure consistency. use crate::DbError; +use crate::pickle::Pickle; +use anyhow::Context; use eventastic::aggregate::Aggregate; use eventastic::event::{DomainEvent, EventStoreEvent}; use eventastic::repository::Snapshot; -use serde::de::DeserializeOwned; -use sqlx::types::{JsonValue, Uuid}; +use sqlx::types::Uuid; /// Internal representation of a database row containing event data. /// @@ -18,7 +19,7 @@ use sqlx::types::{JsonValue, Uuid}; pub(crate) struct PartialEventRow { pub event_id: Uuid, pub version: i64, - pub event: JsonValue, + pub event: Vec, } impl PartialEventRow { @@ -35,20 +36,21 @@ impl PartialEventRow { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. - /// Returns [`DbError::SerializationError`] if the event JSON cannot be deserialized. + /// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized. pub fn to_event(row: PartialEventRow) -> Result, DbError> where - Evt: DomainEvent + DeserializeOwned, + Evt: DomainEvent + Pickle, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; - serde_json::from_value::(row.event) + Evt::unpickle(&row.event) .map(|e| EventStoreEvent { id: row.event_id, event: e, version: row_version, }) - .map_err(DbError::SerializationError) + .context("Failed to unpickle event") + .map_err(DbError::PicklingError) } } @@ -58,7 +60,7 @@ impl PartialEventRow { /// before converting them to the full [`Snapshot`] type. #[derive(sqlx::FromRow)] pub(crate) struct PartialSnapshotRow { - pub aggregate: serde_json::Value, + pub aggregate: Vec, pub snapshot_version: i64, pub version: i64, } @@ -78,16 +80,17 @@ impl PartialSnapshotRow { /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. /// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64. - /// Returns [`DbError::SerializationError`] if the aggregate JSON cannot be deserialized. + /// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized. pub fn to_snapshot(row: PartialSnapshotRow) -> Result, DbError> where - T: Aggregate + DeserializeOwned, + T: Aggregate + Pickle, { let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; let snapshot_version = u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?; - let aggregate: T = - serde_json::from_value(row.aggregate).map_err(DbError::SerializationError)?; + let aggregate: T = T::unpickle(&row.aggregate) + .context("Failed to unpickle aggregate") + .map_err(DbError::PicklingError)?; Ok(Snapshot { aggregate, diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 492af27..d3464b9 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,23 +1,25 @@ mod common; +mod pickle; mod reader_impl; mod repository; mod side_effect; mod table_registry; mod transaction; + +pub use pickle::Pickle; +pub use repository::PostgresRepository; +pub use side_effect::SideEffectStorage; +pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; +pub use transaction::PostgresTransaction; + use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, event::DomainEvent, repository::{Repository, RepositoryError}, }; -pub use repository::PostgresRepository; -use serde::{Serialize, de::DeserializeOwned}; -pub use side_effect::SideEffectStorage; use sqlx::types::Uuid; -pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; - use thiserror::Error; -pub use transaction::PostgresTransaction; /// Errors that can occur during PostgreSQL operations. #[derive(Error, Debug)] @@ -25,14 +27,14 @@ pub enum DbError { /// A database operation failed. #[error("DB Error {0}")] DbError(sqlx::Error), - /// Failed to serialize or deserialize data to/from JSON. - #[error("Serialization Error {0}")] - SerializationError(#[from] serde_json::Error), + /// Failed to pickle data. + #[error("Pickling Error {0}")] + PicklingError(anyhow::Error), /// An invalid version number was encountered (e.g., negative value where positive expected). #[error("Invalid Version Number")] InvalidVersionNumber, /// An invalid snapshot version number was encountered. - #[error("Invalid Snapshot Version number")] + #[error("Invalid Snapshot Version Number")] InvalidSnapshotVersion, /// A concurrent modification was detected (optimistic locking failure). #[error("Optimistic Concurrency Error")] @@ -62,10 +64,9 @@ impl From for DbError { #[async_trait] pub trait RootExt where - T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, - ::DomainEvent: - DomainEvent + Serialize + DeserializeOwned + Send + Sync, - ::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + ::DomainEvent: DomainEvent + Pickle + Send + Sync, + ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, O: SideEffectStorage + Send + Sync, { @@ -111,10 +112,9 @@ where impl RootExt for T where - T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, - ::DomainEvent: - DomainEvent + Serialize + DeserializeOwned + Send + Sync, - ::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + ::DomainEvent: DomainEvent + Pickle + Send + Sync, + ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, O: SideEffectStorage + Send + Sync, { diff --git a/eventastic_postgres/src/pickle.rs b/eventastic_postgres/src/pickle.rs new file mode 100644 index 0000000..e81f267 --- /dev/null +++ b/eventastic_postgres/src/pickle.rs @@ -0,0 +1,29 @@ +/// Preserve your data so it can be stored in the database. +/// +/// [`Pickle`] is implemented for types that implement [`serde::Serialize`] +/// and [`serde::de::DeserializeOwned`]. It uses [`serde_json`] to do the +/// serialization. +pub trait Pickle: Sized { + type Error: std::error::Error + Send + Sync + 'static; + + /// Convert to bytes for storage. + fn pickle(&self) -> Result, Self::Error>; + + /// Convert back to `Self` from bytes. + fn unpickle(bytes: &[u8]) -> Result; +} + +impl Pickle for T +where + T: serde::Serialize + serde::de::DeserializeOwned, +{ + type Error = serde_json::Error; + + fn pickle(&self) -> Result, Self::Error> { + serde_json::to_vec(self) + } + + fn unpickle(bytes: &[u8]) -> Result { + serde_json::from_slice(bytes) + } +} diff --git a/eventastic_postgres/src/reader_impl.rs b/eventastic_postgres/src/reader_impl.rs index 1a4087d..79db3da 100644 --- a/eventastic_postgres/src/reader_impl.rs +++ b/eventastic_postgres/src/reader_impl.rs @@ -6,13 +6,13 @@ use crate::DbError; use crate::common::{PartialEventRow, PartialSnapshotRow, utils}; +use crate::pickle::Pickle; use eventastic::aggregate::Aggregate; use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use futures::stream; use futures_util::stream::StreamExt; -use serde::de::DeserializeOwned; use sqlx::types::Uuid; use sqlx::{Executor, query_as}; @@ -26,7 +26,7 @@ pub fn stream_from<'e, 'c: 'e, E, T>( where E: Executor<'c, Database = sqlx::Postgres> + 'e, T: Aggregate, - T::DomainEvent: DomainEvent + DeserializeOwned + Send + 'e, + T::DomainEvent: DomainEvent + Pickle + Send + 'e, { let Ok(version) = utils::version_to_i64(version) else { return stream::iter(vec![Err(DbError::InvalidVersionNumber)]).boxed(); @@ -59,7 +59,7 @@ pub async fn get_event<'c, E, T>( where E: Executor<'c, Database = sqlx::Postgres>, T: Aggregate, - T::DomainEvent: DomainEvent + DeserializeOwned + Send, + T::DomainEvent: DomainEvent + Pickle + Send, { query_as::<_, PartialEventRow>(query) .bind(aggregate_id) @@ -78,7 +78,7 @@ pub async fn get_snapshot<'c, E, T>( ) -> Result>, DbError> where E: Executor<'c, Database = sqlx::Postgres>, - T: Aggregate + DeserializeOwned, + T: Aggregate + Pickle, { let row = query_as::<_, PartialSnapshotRow>(query) .bind(id) diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index 8a28af8..f374f54 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,3 +1,4 @@ +use crate::pickle::Pickle; use crate::{DbError, PostgresTransaction, SideEffectStorage, TableRegistry, reader_impl}; use async_trait::async_trait; use eventastic::{ @@ -6,7 +7,6 @@ use eventastic::{ repository::{Repository, RepositoryError, RepositoryReader, Snapshot}, }; use futures::StreamExt; -use serde::{Serialize, de::DeserializeOwned}; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, @@ -82,9 +82,9 @@ where #[async_trait] impl RepositoryReader for PostgresRepository where - T: Aggregate + DeserializeOwned + Serialize + Send + Sync + 'static, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage + Clone + Send + Sync, { @@ -146,9 +146,9 @@ where #[async_trait] impl Repository for PostgresRepository where - T: Aggregate + DeserializeOwned + Serialize + Send + Sync + 'static, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: eventastic::aggregate::SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: eventastic::aggregate::SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage + Clone + Send + Sync, { diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs index 95c6573..cd18fa4 100644 --- a/eventastic_postgres/src/side_effect.rs +++ b/eventastic_postgres/src/side_effect.rs @@ -1,7 +1,7 @@ use crate::DbError; +use crate::pickle::Pickle; use async_trait::async_trait; use eventastic::aggregate::SideEffect; -use serde::Serialize; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -25,7 +25,7 @@ pub trait SideEffectStorage: Send + Sync { /// # Errors /// /// Returns [`DbError`] if the storage operation fails. - async fn store_side_effects + Serialize + Send + Sync>( + async fn store_side_effects + Pickle + Send + Sync>( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, diff --git a/eventastic_postgres/src/table_registry.rs b/eventastic_postgres/src/table_registry.rs index a6c0846..85d084c 100644 --- a/eventastic_postgres/src/table_registry.rs +++ b/eventastic_postgres/src/table_registry.rs @@ -37,7 +37,7 @@ impl TableConfig { ), insert_events_query: format!( "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ - SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::jsonb[], $5::timestamptz[]) \ + SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ ON CONFLICT DO NOTHING returning event_id", &events ), diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index f5b5dce..9252cf5 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,5 +1,7 @@ use crate::common::utils; +use crate::pickle::Pickle; use crate::{DbError, SideEffectStorage, TableRegistry, reader_impl}; +use anyhow::Context as _; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -11,8 +13,6 @@ use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use eventastic::repository::{RepositoryError, RepositoryReader, RepositoryWriter}; use futures::StreamExt; -use serde::Serialize; -use serde::de::DeserializeOwned; use sqlx::Row; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -66,9 +66,9 @@ where id: &Uuid, ) -> Result, RepositoryError> where - T: Aggregate + 'static + Send + Sync + Serialize + DeserializeOwned, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, { Context::load(self, id).await @@ -80,9 +80,9 @@ where aggregate: &mut Context, ) -> Result<(), SaveError> where - T: Aggregate + 'static + Send + Sync + Serialize + DeserializeOwned, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, { aggregate.save(self).await @@ -92,9 +92,9 @@ where #[async_trait] impl RepositoryReader for PostgresTransaction<'_, O> where - T: Aggregate + 'static + DeserializeOwned + Serialize + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + 'static + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage, { @@ -156,9 +156,9 @@ where #[async_trait] impl RepositoryWriter for PostgresTransaction<'_, O> where - T: Aggregate + 'static + DeserializeOwned + Serialize + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + 'static + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage, { @@ -172,7 +172,7 @@ where Vec::with_capacity(events.len()); let mut versions_to_insert: Vec = Vec::with_capacity(events.len()); let mut aggregate_ids_to_insert: Vec = Vec::with_capacity(events.len()); - let mut events_to_insert: Vec = Vec::with_capacity(events.len()); + let mut events_to_insert: Vec> = Vec::with_capacity(events.len()); let mut created_ats_to_insert: Vec> = Vec::with_capacity(events.len()); for event in events { @@ -181,8 +181,11 @@ where let version = utils::version_to_i64(version)?; - let serialised_event = - serde_json::to_value(event.event).map_err(DbError::SerializationError)?; + let serialised_event = event + .event + .pickle() + .context("Failed to pickle event") + .map_err(DbError::PicklingError)?; event_ids_to_insert.push(event_id); versions_to_insert.push(version); @@ -214,8 +217,11 @@ where /// Stores a snapshot of the aggregate in the database async fn store_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Self::DbError> { let aggregated_id = *snapshot.aggregate.aggregate_id(); - let aggregate = - serde_json::to_value(snapshot.aggregate).map_err(DbError::SerializationError)?; + let aggregate = snapshot + .aggregate + .pickle() + .context("Failed to pickle aggregate") + .map_err(DbError::PicklingError)?; let upsert_query = self .tables diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index 52128cd..dc27c0e 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -2,7 +2,7 @@ use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_outbox_postgres::TableOutbox; -use eventastic_postgres::{PostgresRepository, TableRegistryBuilder}; +use eventastic_postgres::{Pickle, PostgresRepository, TableRegistryBuilder}; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; @@ -53,12 +53,12 @@ pub async fn get_account_snapshot(account_id: Uuid) -> Option { .expect("Failed to fetch snapshot"); row.map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -86,12 +86,12 @@ pub async fn get_account_snapshot_with_version( .expect("Failed to fetch snapshot"); row.map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -116,12 +116,12 @@ pub async fn get_all_account_snapshots(account_id: Uuid) -> Vec { rows.into_iter() .map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -158,7 +158,7 @@ pub async fn replace_account_snapshot(account_id: Uuid, snapshot: SavedSnapshot) let mut pg_transaction = transaction.into_inner(); let row = sqlx::query("UPDATE snapshots set aggregate = $1, snapshot_version = $2, version = $3 where aggregate_id = $4") - .bind(serde_json::to_value(&snapshot.aggregate).expect("Failed to serialize snapshot")) + .bind(snapshot.aggregate.pickle().expect("Failed to serialize snapshot")) .bind(snapshot.snapshot_version) .bind(snapshot.version) .bind(account_id) @@ -205,7 +205,7 @@ pub async fn insert_snapshot_with_version(account_id: Uuid, snapshot: SavedSnaps sqlx::query("INSERT INTO snapshots (aggregate_id, aggregate, version, snapshot_version, created_at) VALUES ($1, $2, $3, $4, NOW())") .bind(account_id) - .bind(serde_json::to_value(&snapshot.aggregate).expect("Failed to serialize snapshot")) + .bind(snapshot.aggregate.pickle().expect("Failed to serialize snapshot")) .bind(snapshot.version) .bind(version) .execute(&mut *transaction) @@ -369,7 +369,7 @@ pub async fn get_side_effect( .expect("Failed to query outbox table"); if let Some(row) = row { - let message_json: serde_json::Value = row + let message_bytes: Vec = row .try_get("message") .expect("Failed to get message from row"); let retries: i32 = row @@ -380,7 +380,8 @@ pub async fn get_side_effect( .expect("Failed to get requeue from row"); let side_effect: super::test_aggregate::SideEffects = - serde_json::from_value(message_json).expect("Failed to deserialize side effect"); + super::test_aggregate::SideEffects::unpickle(&message_bytes) + .expect("Failed to deserialize side effect"); Some((side_effect, retries, requeue)) } else {