From eb4b68333ef0e96e1d240b351acf4d91ec1a6996 Mon Sep 17 00:00:00 2001 From: Alex Wakefield Date: Wed, 23 Jul 2025 17:22:35 +0100 Subject: [PATCH] feat: add Pickle trait This allows the application to control serialization. Doing this can allow the application to choose the serialization format they want. It can also allow the application to choose whether to derive serialize on their aggregate or not. For example they may want a mapping to some other type to prevent serializing their aggregate and making it more annoying to change. --- eventastic_outbox_postgres/Cargo.toml | 3 +- eventastic_outbox_postgres/src/outbox.rs | 36 ++++++++------- eventastic_postgres/Cargo.toml | 12 ++--- .../migrations/20230610185630_init.sql | 6 +-- eventastic_postgres/src/common.rs | 27 +++++++----- eventastic_postgres/src/lib.rs | 36 +++++++-------- eventastic_postgres/src/pickle.rs | 29 ++++++++++++ eventastic_postgres/src/reader_impl.rs | 8 ++-- eventastic_postgres/src/repository.rs | 14 +++--- eventastic_postgres/src/side_effect.rs | 4 +- eventastic_postgres/src/table_registry.rs | 2 +- eventastic_postgres/src/transaction.rs | 44 +++++++++++-------- eventastic_postgres/tests/common/helpers.rs | 23 +++++----- 13 files changed, 143 insertions(+), 101 deletions(-) create mode 100644 eventastic_postgres/src/pickle.rs diff --git a/eventastic_outbox_postgres/Cargo.toml b/eventastic_outbox_postgres/Cargo.toml index bff5b98..95b7d1a 100644 --- a/eventastic_outbox_postgres/Cargo.toml +++ b/eventastic_outbox_postgres/Cargo.toml @@ -4,13 +4,12 @@ version = "0.5.0" edition = "2024" [dependencies] +anyhow = { workspace = true } eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" } async-trait = { workspace = true } -serde_json = { workspace = true } sqlx = { workspace = true } chrono = { workspace = true } uuid = { workspace = true } -serde = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } futures-util = { workspace = true } tokio = { workspace = true } diff --git a/eventastic_outbox_postgres/src/outbox.rs b/eventastic_outbox_postgres/src/outbox.rs index 7014984..b4e649f 100644 --- a/eventastic_outbox_postgres/src/outbox.rs +++ b/eventastic_outbox_postgres/src/outbox.rs @@ -1,9 +1,10 @@ +use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, Utc}; use eventastic::aggregate::SideEffect; -use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage}; -use serde::Serialize; -use serde::de::DeserializeOwned; +use eventastic_postgres::{ + DbError, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage, +}; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; use std::sync::Arc; @@ -16,20 +17,23 @@ pub struct TableOutbox; #[async_trait] impl SideEffectStorage for TableOutbox { - async fn store_side_effects + Serialize + Send + Sync>( + async fn store_side_effects + Pickle + Send + Sync>( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, ) -> Result<(), DbError> { let mut ids: Vec = Vec::with_capacity(items.len()); - let mut messages: Vec = Vec::with_capacity(items.len()); + let mut messages: Vec> = Vec::with_capacity(items.len()); let mut retries: Vec = Vec::with_capacity(items.len()); let mut requeues: Vec = Vec::with_capacity(items.len()); let mut created_ats: Vec> = Vec::with_capacity(items.len()); for side_effect in items { let id = *side_effect.id(); - let msg = serde_json::to_value(side_effect).map_err(DbError::SerializationError)?; + let msg = side_effect + .pickle() + .context("Failed to pickle side effect") + .map_err(DbError::PicklingError)?; ids.push(id); messages.push(msg); retries.push(0); @@ -39,7 +43,7 @@ impl SideEffectStorage for TableOutbox { sqlx::query( "INSERT INTO outbox(id, message, retries, requeue, created_at) - SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[]) + SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::boolean[], $5::timestamptz[]) ON CONFLICT (id) DO UPDATE SET message = excluded.message, retries = excluded.retries, @@ -61,7 +65,7 @@ impl SideEffectStorage for TableOutbox { #[async_trait] pub trait TransactionOutboxExt where - T: SideEffect + DeserializeOwned + Send + 'static, + T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, @@ -76,7 +80,7 @@ where #[async_trait] impl TransactionOutboxExt for PostgresTransaction<'_, TableOutbox> where - T: SideEffect + DeserializeOwned + Send + 'static, + T: SideEffect + Pickle + Send + 'static, T::SideEffectId: Clone + Send + 'static, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> + sqlx::Type + sqlx::Encode<'sql, Postgres> + Unpin, @@ -84,7 +88,7 @@ where async fn get_outbox_batch(&mut self) -> Result>, DbError> { #[derive(sqlx::FromRow)] struct OutboxRow { - message: serde_json::Value, + message: Vec, retries: i32, requeue: bool, } @@ -99,11 +103,11 @@ where rows.into_iter() .map(|row| { - let msg = serde_json::from_value(row.message)?; + let msg = T::unpickle(&row.message).context("Failed to unpickle side effect")?; Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue)) }) - .collect::, serde_json::Error>>() - .map_err(DbError::SerializationError) + .collect::, anyhow::Error>>() + .map_err(DbError::PicklingError) } async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> { @@ -171,7 +175,7 @@ pub trait RepositoryOutboxExt { poll_interval: std::time::Duration, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> @@ -188,7 +192,7 @@ impl RepositoryOutboxExt for PostgresRepository { poll_interval: std::time::Duration, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres> @@ -210,7 +214,7 @@ async fn process_outbox_batch( handler: Arc, ) -> Result<(), DbError> where - T: SideEffect + DeserializeOwned + Send + Sync + 'static, + T: SideEffect + Pickle + Send + Sync + 'static, T::SideEffectId: Clone + Send + 'static, H: SideEffectHandler + Send + Sync, for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt, diff --git a/eventastic_postgres/Cargo.toml b/eventastic_postgres/Cargo.toml index 8f9396f..e95cfd9 100644 --- a/eventastic_postgres/Cargo.toml +++ b/eventastic_postgres/Cargo.toml @@ -10,18 +10,18 @@ categories = ["web-programming", "asynchronous"] keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"] [dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } chrono = { workspace = true } eventastic = { path = "../eventastic", version = "0.5" } -sqlx = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } -async-trait = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } serde = { workspace = true } -anyhow = { workspace = true } +serde_json = { workspace = true } +sqlx = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } [dev-dependencies] -uuid = { workspace = true } eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" } +uuid = { workspace = true } diff --git a/eventastic_postgres/migrations/20230610185630_init.sql b/eventastic_postgres/migrations/20230610185630_init.sql index 106aa20..1c07ba2 100644 --- a/eventastic_postgres/migrations/20230610185630_init.sql +++ b/eventastic_postgres/migrations/20230610185630_init.sql @@ -2,7 +2,7 @@ CREATE TABLE if not exists events ( aggregate_id uuid NOT NULL, version bigint NOT NULL CHECK (version >= 0), event_id uuid NOT NULL, - event jsonb NOT NULL, + event bytea NOT NULL, created_at timestamptz NOT NULL, PRIMARY KEY (aggregate_id, version) ); @@ -11,7 +11,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS events_event_id ON events (event_id); CREATE TABLE if not exists snapshots ( aggregate_id uuid NOT NULL, - aggregate jsonb NOT NULL, + aggregate bytea NOT NULL, version bigint NOT NULL CHECK (version >= 0), snapshot_version bigint NOT NULL, created_at timestamptz NOT NULL, @@ -20,7 +20,7 @@ CREATE TABLE if not exists snapshots ( CREATE TABLE if not exists outbox ( id uuid PRIMARY KEY, - message jsonb NOT NULL, + message bytea NOT NULL, retries integer NOT NULL CHECK (retries >= 0), requeue boolean NOT NULL, created_at timestamptz NOT NULL diff --git a/eventastic_postgres/src/common.rs b/eventastic_postgres/src/common.rs index 146cecf..40726f5 100644 --- a/eventastic_postgres/src/common.rs +++ b/eventastic_postgres/src/common.rs @@ -4,11 +4,12 @@ //! duplication and ensure consistency. use crate::DbError; +use crate::pickle::Pickle; +use anyhow::Context; use eventastic::aggregate::Aggregate; use eventastic::event::{DomainEvent, EventStoreEvent}; use eventastic::repository::Snapshot; -use serde::de::DeserializeOwned; -use sqlx::types::{JsonValue, Uuid}; +use sqlx::types::Uuid; /// Internal representation of a database row containing event data. /// @@ -18,7 +19,7 @@ use sqlx::types::{JsonValue, Uuid}; pub(crate) struct PartialEventRow { pub event_id: Uuid, pub version: i64, - pub event: JsonValue, + pub event: Vec, } impl PartialEventRow { @@ -35,20 +36,21 @@ impl PartialEventRow { /// # Errors /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. - /// Returns [`DbError::SerializationError`] if the event JSON cannot be deserialized. + /// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized. pub fn to_event(row: PartialEventRow) -> Result, DbError> where - Evt: DomainEvent + DeserializeOwned, + Evt: DomainEvent + Pickle, { let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; - serde_json::from_value::(row.event) + Evt::unpickle(&row.event) .map(|e| EventStoreEvent { id: row.event_id, event: e, version: row_version, }) - .map_err(DbError::SerializationError) + .context("Failed to unpickle event") + .map_err(DbError::PicklingError) } } @@ -58,7 +60,7 @@ impl PartialEventRow { /// before converting them to the full [`Snapshot`] type. #[derive(sqlx::FromRow)] pub(crate) struct PartialSnapshotRow { - pub aggregate: serde_json::Value, + pub aggregate: Vec, pub snapshot_version: i64, pub version: i64, } @@ -78,16 +80,17 @@ impl PartialSnapshotRow { /// /// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64. /// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64. - /// Returns [`DbError::SerializationError`] if the aggregate JSON cannot be deserialized. + /// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized. pub fn to_snapshot(row: PartialSnapshotRow) -> Result, DbError> where - T: Aggregate + DeserializeOwned, + T: Aggregate + Pickle, { let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?; let snapshot_version = u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?; - let aggregate: T = - serde_json::from_value(row.aggregate).map_err(DbError::SerializationError)?; + let aggregate: T = T::unpickle(&row.aggregate) + .context("Failed to unpickle aggregate") + .map_err(DbError::PicklingError)?; Ok(Snapshot { aggregate, diff --git a/eventastic_postgres/src/lib.rs b/eventastic_postgres/src/lib.rs index 492af27..d3464b9 100644 --- a/eventastic_postgres/src/lib.rs +++ b/eventastic_postgres/src/lib.rs @@ -1,23 +1,25 @@ mod common; +mod pickle; mod reader_impl; mod repository; mod side_effect; mod table_registry; mod transaction; + +pub use pickle::Pickle; +pub use repository::PostgresRepository; +pub use side_effect::SideEffectStorage; +pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; +pub use transaction::PostgresTransaction; + use async_trait::async_trait; use eventastic::{ aggregate::{Aggregate, Context, SideEffect}, event::DomainEvent, repository::{Repository, RepositoryError}, }; -pub use repository::PostgresRepository; -use serde::{Serialize, de::DeserializeOwned}; -pub use side_effect::SideEffectStorage; use sqlx::types::Uuid; -pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder}; - use thiserror::Error; -pub use transaction::PostgresTransaction; /// Errors that can occur during PostgreSQL operations. #[derive(Error, Debug)] @@ -25,14 +27,14 @@ pub enum DbError { /// A database operation failed. #[error("DB Error {0}")] DbError(sqlx::Error), - /// Failed to serialize or deserialize data to/from JSON. - #[error("Serialization Error {0}")] - SerializationError(#[from] serde_json::Error), + /// Failed to pickle data. + #[error("Pickling Error {0}")] + PicklingError(anyhow::Error), /// An invalid version number was encountered (e.g., negative value where positive expected). #[error("Invalid Version Number")] InvalidVersionNumber, /// An invalid snapshot version number was encountered. - #[error("Invalid Snapshot Version number")] + #[error("Invalid Snapshot Version Number")] InvalidSnapshotVersion, /// A concurrent modification was detected (optimistic locking failure). #[error("Optimistic Concurrency Error")] @@ -62,10 +64,9 @@ impl From for DbError { #[async_trait] pub trait RootExt where - T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, - ::DomainEvent: - DomainEvent + Serialize + DeserializeOwned + Send + Sync, - ::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + ::DomainEvent: DomainEvent + Pickle + Send + Sync, + ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, O: SideEffectStorage + Send + Sync, { @@ -111,10 +112,9 @@ where impl RootExt for T where - T: Aggregate + Serialize + DeserializeOwned + Send + Sync + 'static, - ::DomainEvent: - DomainEvent + Serialize + DeserializeOwned + Send + Sync, - ::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + ::DomainEvent: DomainEvent + Pickle + Send + Sync, + ::SideEffect: SideEffect + Pickle + Send + Sync, ::ApplyError: Send + Sync, O: SideEffectStorage + Send + Sync, { diff --git a/eventastic_postgres/src/pickle.rs b/eventastic_postgres/src/pickle.rs new file mode 100644 index 0000000..e81f267 --- /dev/null +++ b/eventastic_postgres/src/pickle.rs @@ -0,0 +1,29 @@ +/// Preserve your data so it can be stored in the database. +/// +/// [`Pickle`] is implemented for types that implement [`serde::Serialize`] +/// and [`serde::de::DeserializeOwned`]. It uses [`serde_json`] to do the +/// serialization. +pub trait Pickle: Sized { + type Error: std::error::Error + Send + Sync + 'static; + + /// Convert to bytes for storage. + fn pickle(&self) -> Result, Self::Error>; + + /// Convert back to `Self` from bytes. + fn unpickle(bytes: &[u8]) -> Result; +} + +impl Pickle for T +where + T: serde::Serialize + serde::de::DeserializeOwned, +{ + type Error = serde_json::Error; + + fn pickle(&self) -> Result, Self::Error> { + serde_json::to_vec(self) + } + + fn unpickle(bytes: &[u8]) -> Result { + serde_json::from_slice(bytes) + } +} diff --git a/eventastic_postgres/src/reader_impl.rs b/eventastic_postgres/src/reader_impl.rs index 1a4087d..79db3da 100644 --- a/eventastic_postgres/src/reader_impl.rs +++ b/eventastic_postgres/src/reader_impl.rs @@ -6,13 +6,13 @@ use crate::DbError; use crate::common::{PartialEventRow, PartialSnapshotRow, utils}; +use crate::pickle::Pickle; use eventastic::aggregate::Aggregate; use eventastic::event::DomainEvent; use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use futures::stream; use futures_util::stream::StreamExt; -use serde::de::DeserializeOwned; use sqlx::types::Uuid; use sqlx::{Executor, query_as}; @@ -26,7 +26,7 @@ pub fn stream_from<'e, 'c: 'e, E, T>( where E: Executor<'c, Database = sqlx::Postgres> + 'e, T: Aggregate, - T::DomainEvent: DomainEvent + DeserializeOwned + Send + 'e, + T::DomainEvent: DomainEvent + Pickle + Send + 'e, { let Ok(version) = utils::version_to_i64(version) else { return stream::iter(vec![Err(DbError::InvalidVersionNumber)]).boxed(); @@ -59,7 +59,7 @@ pub async fn get_event<'c, E, T>( where E: Executor<'c, Database = sqlx::Postgres>, T: Aggregate, - T::DomainEvent: DomainEvent + DeserializeOwned + Send, + T::DomainEvent: DomainEvent + Pickle + Send, { query_as::<_, PartialEventRow>(query) .bind(aggregate_id) @@ -78,7 +78,7 @@ pub async fn get_snapshot<'c, E, T>( ) -> Result>, DbError> where E: Executor<'c, Database = sqlx::Postgres>, - T: Aggregate + DeserializeOwned, + T: Aggregate + Pickle, { let row = query_as::<_, PartialSnapshotRow>(query) .bind(id) diff --git a/eventastic_postgres/src/repository.rs b/eventastic_postgres/src/repository.rs index 8a28af8..f374f54 100644 --- a/eventastic_postgres/src/repository.rs +++ b/eventastic_postgres/src/repository.rs @@ -1,3 +1,4 @@ +use crate::pickle::Pickle; use crate::{DbError, PostgresTransaction, SideEffectStorage, TableRegistry, reader_impl}; use async_trait::async_trait; use eventastic::{ @@ -6,7 +7,6 @@ use eventastic::{ repository::{Repository, RepositoryError, RepositoryReader, Snapshot}, }; use futures::StreamExt; -use serde::{Serialize, de::DeserializeOwned}; use sqlx::{ Pool, Postgres, postgres::{PgConnectOptions, PgPoolOptions}, @@ -82,9 +82,9 @@ where #[async_trait] impl RepositoryReader for PostgresRepository where - T: Aggregate + DeserializeOwned + Serialize + Send + Sync + 'static, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage + Clone + Send + Sync, { @@ -146,9 +146,9 @@ where #[async_trait] impl Repository for PostgresRepository where - T: Aggregate + DeserializeOwned + Serialize + Send + Sync + 'static, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: eventastic::aggregate::SideEffect + Serialize + Send + Sync, + T: Aggregate + Pickle + Send + Sync + 'static, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: eventastic::aggregate::SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage + Clone + Send + Sync, { diff --git a/eventastic_postgres/src/side_effect.rs b/eventastic_postgres/src/side_effect.rs index 95c6573..cd18fa4 100644 --- a/eventastic_postgres/src/side_effect.rs +++ b/eventastic_postgres/src/side_effect.rs @@ -1,7 +1,7 @@ use crate::DbError; +use crate::pickle::Pickle; use async_trait::async_trait; use eventastic::aggregate::SideEffect; -use serde::Serialize; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -25,7 +25,7 @@ pub trait SideEffectStorage: Send + Sync { /// # Errors /// /// Returns [`DbError`] if the storage operation fails. - async fn store_side_effects + Serialize + Send + Sync>( + async fn store_side_effects + Pickle + Send + Sync>( &self, transaction: &mut Transaction<'_, Postgres>, items: Vec, diff --git a/eventastic_postgres/src/table_registry.rs b/eventastic_postgres/src/table_registry.rs index a6c0846..85d084c 100644 --- a/eventastic_postgres/src/table_registry.rs +++ b/eventastic_postgres/src/table_registry.rs @@ -37,7 +37,7 @@ impl TableConfig { ), insert_events_query: format!( "INSERT INTO {} (event_id, version, aggregate_id, event, created_at) \ - SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::jsonb[], $5::timestamptz[]) \ + SELECT * FROM UNNEST($1::uuid[], $2::bigint[], $3::uuid[], $4::bytea[], $5::timestamptz[]) \ ON CONFLICT DO NOTHING returning event_id", &events ), diff --git a/eventastic_postgres/src/transaction.rs b/eventastic_postgres/src/transaction.rs index f5b5dce..9252cf5 100644 --- a/eventastic_postgres/src/transaction.rs +++ b/eventastic_postgres/src/transaction.rs @@ -1,5 +1,7 @@ use crate::common::utils; +use crate::pickle::Pickle; use crate::{DbError, SideEffectStorage, TableRegistry, reader_impl}; +use anyhow::Context as _; use async_trait::async_trait; use chrono::DateTime; use chrono::Utc; @@ -11,8 +13,6 @@ use eventastic::event::EventStoreEvent; use eventastic::repository::Snapshot; use eventastic::repository::{RepositoryError, RepositoryReader, RepositoryWriter}; use futures::StreamExt; -use serde::Serialize; -use serde::de::DeserializeOwned; use sqlx::Row; use sqlx::types::Uuid; use sqlx::{Postgres, Transaction}; @@ -66,9 +66,9 @@ where id: &Uuid, ) -> Result, RepositoryError> where - T: Aggregate + 'static + Send + Sync + Serialize + DeserializeOwned, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, { Context::load(self, id).await @@ -80,9 +80,9 @@ where aggregate: &mut Context, ) -> Result<(), SaveError> where - T: Aggregate + 'static + Send + Sync + Serialize + DeserializeOwned, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, + T: Aggregate + 'static + Send + Sync + Pickle, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, T::ApplyError: Send + Sync, { aggregate.save(self).await @@ -92,9 +92,9 @@ where #[async_trait] impl RepositoryReader for PostgresTransaction<'_, O> where - T: Aggregate + 'static + DeserializeOwned + Serialize + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + 'static + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage, { @@ -156,9 +156,9 @@ where #[async_trait] impl RepositoryWriter for PostgresTransaction<'_, O> where - T: Aggregate + 'static + DeserializeOwned + Serialize + Send + Sync, - T::SideEffect: SideEffect + Serialize + Send + Sync, - T::DomainEvent: DomainEvent + Serialize + DeserializeOwned + Send + Sync, + T: Aggregate + 'static + Pickle + Send + Sync, + T::SideEffect: SideEffect + Pickle + Send + Sync, + T::DomainEvent: DomainEvent + Pickle + Send + Sync, T::ApplyError: Send + Sync, O: SideEffectStorage, { @@ -172,7 +172,7 @@ where Vec::with_capacity(events.len()); let mut versions_to_insert: Vec = Vec::with_capacity(events.len()); let mut aggregate_ids_to_insert: Vec = Vec::with_capacity(events.len()); - let mut events_to_insert: Vec = Vec::with_capacity(events.len()); + let mut events_to_insert: Vec> = Vec::with_capacity(events.len()); let mut created_ats_to_insert: Vec> = Vec::with_capacity(events.len()); for event in events { @@ -181,8 +181,11 @@ where let version = utils::version_to_i64(version)?; - let serialised_event = - serde_json::to_value(event.event).map_err(DbError::SerializationError)?; + let serialised_event = event + .event + .pickle() + .context("Failed to pickle event") + .map_err(DbError::PicklingError)?; event_ids_to_insert.push(event_id); versions_to_insert.push(version); @@ -214,8 +217,11 @@ where /// Stores a snapshot of the aggregate in the database async fn store_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Self::DbError> { let aggregated_id = *snapshot.aggregate.aggregate_id(); - let aggregate = - serde_json::to_value(snapshot.aggregate).map_err(DbError::SerializationError)?; + let aggregate = snapshot + .aggregate + .pickle() + .context("Failed to pickle aggregate") + .map_err(DbError::PicklingError)?; let upsert_query = self .tables diff --git a/eventastic_postgres/tests/common/helpers.rs b/eventastic_postgres/tests/common/helpers.rs index 52128cd..dc27c0e 100644 --- a/eventastic_postgres/tests/common/helpers.rs +++ b/eventastic_postgres/tests/common/helpers.rs @@ -2,7 +2,7 @@ use super::test_aggregate::{Account, AccountEvent}; use chrono::{DateTime, Utc}; use eventastic::aggregate::{Context, Root}; use eventastic_outbox_postgres::TableOutbox; -use eventastic_postgres::{PostgresRepository, TableRegistryBuilder}; +use eventastic_postgres::{Pickle, PostgresRepository, TableRegistryBuilder}; use sqlx::Row; use sqlx::{pool::PoolOptions, postgres::PgConnectOptions}; use std::str::FromStr; @@ -53,12 +53,12 @@ pub async fn get_account_snapshot(account_id: Uuid) -> Option { .expect("Failed to fetch snapshot"); row.map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -86,12 +86,12 @@ pub async fn get_account_snapshot_with_version( .expect("Failed to fetch snapshot"); row.map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -116,12 +116,12 @@ pub async fn get_all_account_snapshots(account_id: Uuid) -> Vec { rows.into_iter() .map(|row| { - let aggregate: Result = row.try_get("aggregate"); + let aggregate_bytes: Result, _> = row.try_get("aggregate"); let version: Result = row.try_get("version"); let snapshot_version: Result = row.try_get("snapshot_version"); SavedSnapshot { - aggregate: serde_json::from_value(aggregate.unwrap()).unwrap(), + aggregate: Account::unpickle(&aggregate_bytes.unwrap()).unwrap(), version: version.unwrap(), snapshot_version: snapshot_version.unwrap(), } @@ -158,7 +158,7 @@ pub async fn replace_account_snapshot(account_id: Uuid, snapshot: SavedSnapshot) let mut pg_transaction = transaction.into_inner(); let row = sqlx::query("UPDATE snapshots set aggregate = $1, snapshot_version = $2, version = $3 where aggregate_id = $4") - .bind(serde_json::to_value(&snapshot.aggregate).expect("Failed to serialize snapshot")) + .bind(snapshot.aggregate.pickle().expect("Failed to serialize snapshot")) .bind(snapshot.snapshot_version) .bind(snapshot.version) .bind(account_id) @@ -205,7 +205,7 @@ pub async fn insert_snapshot_with_version(account_id: Uuid, snapshot: SavedSnaps sqlx::query("INSERT INTO snapshots (aggregate_id, aggregate, version, snapshot_version, created_at) VALUES ($1, $2, $3, $4, NOW())") .bind(account_id) - .bind(serde_json::to_value(&snapshot.aggregate).expect("Failed to serialize snapshot")) + .bind(snapshot.aggregate.pickle().expect("Failed to serialize snapshot")) .bind(snapshot.version) .bind(version) .execute(&mut *transaction) @@ -369,7 +369,7 @@ pub async fn get_side_effect( .expect("Failed to query outbox table"); if let Some(row) = row { - let message_json: serde_json::Value = row + let message_bytes: Vec = row .try_get("message") .expect("Failed to get message from row"); let retries: i32 = row @@ -380,7 +380,8 @@ pub async fn get_side_effect( .expect("Failed to get requeue from row"); let side_effect: super::test_aggregate::SideEffects = - serde_json::from_value(message_json).expect("Failed to deserialize side effect"); + super::test_aggregate::SideEffects::unpickle(&message_bytes) + .expect("Failed to deserialize side effect"); Some((side_effect, retries, requeue)) } else {