Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions eventastic_outbox_postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ version = "0.5.0"
edition = "2024"

[dependencies]
anyhow = { workspace = true }
eventastic_postgres = { version = "0.5", path = "../eventastic_postgres" }
async-trait = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
serde = { workspace = true }
eventastic = { path = "../eventastic", version = "0.5" }
futures-util = { workspace = true }
tokio = { workspace = true }
36 changes: 20 additions & 16 deletions eventastic_outbox_postgres/src/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::Context;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use eventastic::aggregate::SideEffect;
use eventastic_postgres::{DbError, PostgresRepository, PostgresTransaction, SideEffectStorage};
use serde::Serialize;
use serde::de::DeserializeOwned;
use eventastic_postgres::{
DbError, Pickle, PostgresRepository, PostgresTransaction, SideEffectStorage,
};
use sqlx::types::Uuid;
use sqlx::{Postgres, Transaction};
use std::sync::Arc;
Expand All @@ -16,20 +17,23 @@ pub struct TableOutbox;

#[async_trait]
impl SideEffectStorage for TableOutbox {
async fn store_side_effects<T: SideEffect<SideEffectId = Uuid> + Serialize + Send + Sync>(
async fn store_side_effects<T: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync>(
&self,
transaction: &mut Transaction<'_, Postgres>,
items: Vec<T>,
) -> Result<(), DbError> {
let mut ids: Vec<Uuid> = Vec::with_capacity(items.len());
let mut messages: Vec<serde_json::Value> = Vec::with_capacity(items.len());
let mut messages: Vec<Vec<u8>> = Vec::with_capacity(items.len());
let mut retries: Vec<i32> = Vec::with_capacity(items.len());
let mut requeues: Vec<bool> = Vec::with_capacity(items.len());
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(items.len());

for side_effect in items {
let id = *side_effect.id();
let msg = serde_json::to_value(side_effect).map_err(DbError::SerializationError)?;
let msg = side_effect
.pickle()
.context("Failed to pickle side effect")
.map_err(DbError::PicklingError)?;
ids.push(id);
messages.push(msg);
retries.push(0);
Expand All @@ -39,7 +43,7 @@ impl SideEffectStorage for TableOutbox {

sqlx::query(
"INSERT INTO outbox(id, message, retries, requeue, created_at)
SELECT * FROM UNNEST($1::uuid[], $2::jsonb[], $3::int[], $4::boolean[], $5::timestamptz[])
SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::boolean[], $5::timestamptz[])
ON CONFLICT (id) DO UPDATE SET
message = excluded.message,
retries = excluded.retries,
Expand All @@ -61,7 +65,7 @@ impl SideEffectStorage for TableOutbox {
#[async_trait]
pub trait TransactionOutboxExt<T>
where
T: SideEffect + DeserializeOwned + Send + 'static,
T: SideEffect + Pickle + Send + 'static,
T::SideEffectId: Clone + Send + 'static,
for<'sql> T::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
Expand All @@ -76,15 +80,15 @@ where
#[async_trait]
impl<T> TransactionOutboxExt<T> for PostgresTransaction<'_, TableOutbox>
where
T: SideEffect + DeserializeOwned + Send + 'static,
T: SideEffect + Pickle + Send + 'static,
T::SideEffectId: Clone + Send + 'static,
for<'sql> T::SideEffectId:
sqlx::Decode<'sql, Postgres> + sqlx::Type<Postgres> + sqlx::Encode<'sql, Postgres> + Unpin,
{
async fn get_outbox_batch(&mut self) -> Result<Vec<OutboxMessage<T>>, DbError> {
#[derive(sqlx::FromRow)]
struct OutboxRow {
message: serde_json::Value,
message: Vec<u8>,
retries: i32,
requeue: bool,
}
Expand All @@ -99,11 +103,11 @@ where

rows.into_iter()
.map(|row| {
let msg = serde_json::from_value(row.message)?;
let msg = T::unpickle(&row.message).context("Failed to unpickle side effect")?;
Ok(OutboxMessage::new(msg, row.retries as u16, row.requeue))
})
.collect::<Result<Vec<_>, serde_json::Error>>()
.map_err(DbError::SerializationError)
.collect::<Result<Vec<_>, anyhow::Error>>()
.map_err(DbError::PicklingError)
}

async fn delete_outbox_item(&mut self, id: T::SideEffectId) -> Result<(), DbError> {
Expand Down Expand Up @@ -171,7 +175,7 @@ pub trait RepositoryOutboxExt {
poll_interval: std::time::Duration,
) -> Result<(), DbError>
where
T: SideEffect + DeserializeOwned + Send + Sync + 'static,
T: SideEffect + Pickle + Send + Sync + 'static,
T::SideEffectId: Clone + Send + 'static,
H: SideEffectHandler<SideEffect = T> + Send + Sync,
for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres>
Expand All @@ -188,7 +192,7 @@ impl RepositoryOutboxExt for PostgresRepository<TableOutbox> {
poll_interval: std::time::Duration,
) -> Result<(), DbError>
where
T: SideEffect + DeserializeOwned + Send + Sync + 'static,
T: SideEffect + Pickle + Send + Sync + 'static,
T::SideEffectId: Clone + Send + 'static,
H: SideEffectHandler<SideEffect = T> + Send + Sync,
for<'sql> T::SideEffectId: sqlx::Decode<'sql, Postgres>
Expand All @@ -210,7 +214,7 @@ async fn process_outbox_batch<T, H>(
handler: Arc<H>,
) -> Result<(), DbError>
where
T: SideEffect + DeserializeOwned + Send + Sync + 'static,
T: SideEffect + Pickle + Send + Sync + 'static,
T::SideEffectId: Clone + Send + 'static,
H: SideEffectHandler<SideEffect = T> + Send + Sync,
for<'a> PostgresTransaction<'a, TableOutbox>: TransactionOutboxExt<T>,
Expand Down
12 changes: 6 additions & 6 deletions eventastic_postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ categories = ["web-programming", "asynchronous"]
keywords = ["postgres", "postgresql", "database", "ddd", "event-sourcing"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
eventastic = { path = "../eventastic", version = "0.5" }
sqlx = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
serde = { workspace = true }
anyhow = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
uuid = { workspace = true }
eventastic_outbox_postgres = { path = "../eventastic_outbox_postgres" }
uuid = { workspace = true }
6 changes: 3 additions & 3 deletions eventastic_postgres/migrations/20230610185630_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE if not exists events (
aggregate_id uuid NOT NULL,
version bigint NOT NULL CHECK (version >= 0),
event_id uuid NOT NULL,
event jsonb NOT NULL,
event bytea NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY (aggregate_id, version)
);
Expand All @@ -11,7 +11,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS events_event_id ON events (event_id);

CREATE TABLE if not exists snapshots (
aggregate_id uuid NOT NULL,
aggregate jsonb NOT NULL,
aggregate bytea NOT NULL,
version bigint NOT NULL CHECK (version >= 0),
snapshot_version bigint NOT NULL,
created_at timestamptz NOT NULL,
Expand All @@ -20,7 +20,7 @@ CREATE TABLE if not exists snapshots (

CREATE TABLE if not exists outbox (
id uuid PRIMARY KEY,
message jsonb NOT NULL,
message bytea NOT NULL,
retries integer NOT NULL CHECK (retries >= 0),
requeue boolean NOT NULL,
created_at timestamptz NOT NULL
Expand Down
27 changes: 15 additions & 12 deletions eventastic_postgres/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! duplication and ensure consistency.

use crate::DbError;
use crate::pickle::Pickle;
use anyhow::Context;
use eventastic::aggregate::Aggregate;
use eventastic::event::{DomainEvent, EventStoreEvent};
use eventastic::repository::Snapshot;
use serde::de::DeserializeOwned;
use sqlx::types::{JsonValue, Uuid};
use sqlx::types::Uuid;

/// Internal representation of a database row containing event data.
///
Expand All @@ -18,7 +19,7 @@ use sqlx::types::{JsonValue, Uuid};
pub(crate) struct PartialEventRow {
pub event_id: Uuid,
pub version: i64,
pub event: JsonValue,
pub event: Vec<u8>,
}

impl PartialEventRow {
Expand All @@ -35,20 +36,21 @@ impl PartialEventRow {
/// # Errors
///
/// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64.
/// Returns [`DbError::SerializationError`] if the event JSON cannot be deserialized.
/// Returns [`DbError::PicklingError`] if the event JSON cannot be deserialized.
pub fn to_event<Evt>(row: PartialEventRow) -> Result<EventStoreEvent<Evt>, DbError>
where
Evt: DomainEvent<EventId = Uuid> + DeserializeOwned,
Evt: DomainEvent<EventId = Uuid> + Pickle,
{
let row_version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?;

serde_json::from_value::<Evt>(row.event)
Evt::unpickle(&row.event)
.map(|e| EventStoreEvent {
id: row.event_id,
event: e,
version: row_version,
})
.map_err(DbError::SerializationError)
.context("Failed to unpickle event")
.map_err(DbError::PicklingError)
}
}

Expand All @@ -58,7 +60,7 @@ impl PartialEventRow {
/// before converting them to the full [`Snapshot`] type.
#[derive(sqlx::FromRow)]
pub(crate) struct PartialSnapshotRow {
pub aggregate: serde_json::Value,
pub aggregate: Vec<u8>,
pub snapshot_version: i64,
pub version: i64,
}
Expand All @@ -78,16 +80,17 @@ impl PartialSnapshotRow {
///
/// Returns [`DbError::InvalidVersionNumber`] if the version cannot be converted to u64.
/// Returns [`DbError::InvalidSnapshotVersion`] if the snapshot version cannot be converted to u64.
/// Returns [`DbError::SerializationError`] if the aggregate JSON cannot be deserialized.
/// Returns [`DbError::PicklingError`] if the aggregate JSON cannot be deserialized.
pub fn to_snapshot<T>(row: PartialSnapshotRow) -> Result<Snapshot<T>, DbError>
where
T: Aggregate + DeserializeOwned,
T: Aggregate + Pickle,
{
let version = u64::try_from(row.version).map_err(|_| DbError::InvalidVersionNumber)?;
let snapshot_version =
u64::try_from(row.snapshot_version).map_err(|_| DbError::InvalidSnapshotVersion)?;
let aggregate: T =
serde_json::from_value(row.aggregate).map_err(DbError::SerializationError)?;
let aggregate: T = T::unpickle(&row.aggregate)
.context("Failed to unpickle aggregate")
.map_err(DbError::PicklingError)?;

Ok(Snapshot {
aggregate,
Expand Down
36 changes: 18 additions & 18 deletions eventastic_postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
mod common;
mod pickle;
mod reader_impl;
mod repository;
mod side_effect;
mod table_registry;
mod transaction;

pub use pickle::Pickle;
pub use repository::PostgresRepository;
pub use side_effect::SideEffectStorage;
pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder};
pub use transaction::PostgresTransaction;

use async_trait::async_trait;
use eventastic::{
aggregate::{Aggregate, Context, SideEffect},
event::DomainEvent,
repository::{Repository, RepositoryError},
};
pub use repository::PostgresRepository;
use serde::{Serialize, de::DeserializeOwned};
pub use side_effect::SideEffectStorage;
use sqlx::types::Uuid;
pub use table_registry::{TableConfig, TableRegistry, TableRegistryBuilder};

use thiserror::Error;
pub use transaction::PostgresTransaction;

/// Errors that can occur during PostgreSQL operations.
#[derive(Error, Debug)]
pub enum DbError {
/// A database operation failed.
#[error("DB Error {0}")]
DbError(sqlx::Error),
/// Failed to serialize or deserialize data to/from JSON.
#[error("Serialization Error {0}")]
SerializationError(#[from] serde_json::Error),
/// Failed to pickle data.
#[error("Pickling Error {0}")]
PicklingError(anyhow::Error),
/// An invalid version number was encountered (e.g., negative value where positive expected).
#[error("Invalid Version Number")]
InvalidVersionNumber,
/// An invalid snapshot version number was encountered.
#[error("Invalid Snapshot Version number")]
#[error("Invalid Snapshot Version Number")]
InvalidSnapshotVersion,
/// A concurrent modification was detected (optimistic locking failure).
#[error("Optimistic Concurrency Error")]
Expand Down Expand Up @@ -62,10 +64,9 @@ impl From<sqlx::Error> for DbError {
#[async_trait]
pub trait RootExt<T, O>
where
T: Aggregate<AggregateId = Uuid> + Serialize + DeserializeOwned + Send + Sync + 'static,
<T as Aggregate>::DomainEvent:
DomainEvent<EventId = Uuid> + Serialize + DeserializeOwned + Send + Sync,
<T as Aggregate>::SideEffect: SideEffect<SideEffectId = Uuid> + Serialize + Send + Sync,
T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
<T as Aggregate>::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
<T as Aggregate>::SideEffect: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
<T as Aggregate>::ApplyError: Send + Sync,
O: SideEffectStorage + Send + Sync,
{
Expand Down Expand Up @@ -111,10 +112,9 @@ where

impl<T, O> RootExt<T, O> for T
where
T: Aggregate<AggregateId = Uuid> + Serialize + DeserializeOwned + Send + Sync + 'static,
<T as Aggregate>::DomainEvent:
DomainEvent<EventId = Uuid> + Serialize + DeserializeOwned + Send + Sync,
<T as Aggregate>::SideEffect: SideEffect<SideEffectId = Uuid> + Serialize + Send + Sync,
T: Aggregate<AggregateId = Uuid> + Pickle + Send + Sync + 'static,
<T as Aggregate>::DomainEvent: DomainEvent<EventId = Uuid> + Pickle + Send + Sync,
<T as Aggregate>::SideEffect: SideEffect<SideEffectId = Uuid> + Pickle + Send + Sync,
<T as Aggregate>::ApplyError: Send + Sync,
O: SideEffectStorage + Send + Sync,
{
Expand Down
29 changes: 29 additions & 0 deletions eventastic_postgres/src/pickle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/// Preserve your data so it can be stored in the database.
///
/// [`Pickle`] is implemented for types that implement [`serde::Serialize`]
/// and [`serde::de::DeserializeOwned`]. It uses [`serde_json`] to do the
/// serialization.
pub trait Pickle: Sized {
type Error: std::error::Error + Send + Sync + 'static;

/// Convert to bytes for storage.
fn pickle(&self) -> Result<Vec<u8>, Self::Error>;

/// Convert back to `Self` from bytes.
fn unpickle(bytes: &[u8]) -> Result<Self, Self::Error>;
}

impl<T> Pickle for T
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
type Error = serde_json::Error;

fn pickle(&self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(self)
}

fn unpickle(bytes: &[u8]) -> Result<Self, Self::Error> {
serde_json::from_slice(bytes)
}
}
Loading
Loading