diff --git a/conf/pixels-sink.it.local.properties b/conf/pixels-sink.it.local.properties
new file mode 100644
index 0000000..ed3e622
--- /dev/null
+++ b/conf/pixels-sink.it.local.properties
@@ -0,0 +1,60 @@
+# Source
+sink.datasource=storage
+sink.datasource.rate.limit=-1
+sink.datasource.rate.limit.type=guava
+sink.storage.loop=false
+
+# Sink
+sink.mode=retina
+sink.trans.mode=batch
+sink.commit.method=sync
+sink.commit.batch.size=50
+sink.commit.batch.worker=2
+sink.commit.batch.delay=50
+
+# Retina
+sink.retina.embedded=false
+sink.retina.mode=stream
+sink.retina.client=1
+sink.retina.log.queue=false
+sink.retina.rpc.limit=1000
+sink.retina.trans.limit=1000
+sink.retina.trans.request.batch=false
+sink.remote.host=localhost
+sink.remote.port=29422
+sink.timeout.ms=5000
+sink.flush.interval.ms=100
+sink.flush.batch.size=2
+sink.max.retries=3
+
+# Proto storage source
+sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
+sink.proto.data=ray-recovery-it
+sink.proto.maxRecords=100000
+
+# Recovery
+sink.recovery.enable=true
+sink.recovery.mode=bootstrap
+sink.recovery.bootstrap.force_overwrite=true
+sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-rocksdb
+sink.recovery.insert_as_update=true
+
+# Trino
+trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
+trino.user=pixels
+trino.password=
+trino.parallel=1
+
+# Freshness / monitor
+sink.monitor.enable=false
+sink.monitor.report.enable=false
+sink.monitor.freshness.level=row
+sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
+sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv
+
+# Other
+sink.rpc.enable=false
+sink.rpc.mock.delay=0
+sink.registry.url=http://localhost:8080/apis/registry/v2
+transaction.topic.suffix=transaction
+transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
diff --git a/conf/pixels-sink.it.recovery-window.bootstrap.properties b/conf/pixels-sink.it.recovery-window.bootstrap.properties
new file mode 100644
index 0000000..3f6a9c0
--- /dev/null
+++ b/conf/pixels-sink.it.recovery-window.bootstrap.properties
@@ -0,0 +1,60 @@
+# Source
+sink.datasource=storage
+sink.datasource.rate.limit=-1
+sink.datasource.rate.limit.type=guava
+sink.storage.loop=false
+
+# Sink
+sink.mode=retina
+sink.trans.mode=batch
+sink.commit.method=sync
+sink.commit.batch.size=50
+sink.commit.batch.worker=2
+sink.commit.batch.delay=50
+
+# Retina
+sink.retina.embedded=false
+sink.retina.mode=stream
+sink.retina.client=1
+sink.retina.log.queue=false
+sink.retina.rpc.limit=1000
+sink.retina.trans.limit=1000
+sink.retina.trans.request.batch=false
+sink.remote.host=localhost
+sink.remote.port=29422
+sink.timeout.ms=5000
+sink.flush.interval.ms=100
+sink.flush.batch.size=2
+sink.max.retries=3
+
+# Proto storage source
+sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
+sink.proto.data=ray-recovery-window-it-v2
+sink.proto.maxRecords=100000
+
+# Recovery
+sink.recovery.enable=true
+sink.recovery.mode=bootstrap
+sink.recovery.bootstrap.force_overwrite=true
+sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-window-rocksdb-v2
+sink.recovery.insert_as_update=true
+
+# Trino
+trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
+trino.user=pixels
+trino.password=
+trino.parallel=1
+
+# Freshness / monitor
+sink.monitor.enable=false
+sink.monitor.report.enable=false
+sink.monitor.freshness.level=row
+sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
+sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv
+
+# Other
+sink.rpc.enable=false
+sink.rpc.mock.delay=0
+sink.registry.url=http://localhost:8080/apis/registry/v2
+transaction.topic.suffix=transaction
+transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
diff --git a/conf/pixels-sink.it.recovery-window.recovery.properties b/conf/pixels-sink.it.recovery-window.recovery.properties
new file mode 100644
index 0000000..f77685c
--- /dev/null
+++ b/conf/pixels-sink.it.recovery-window.recovery.properties
@@ -0,0 +1,60 @@
+# Source
+sink.datasource=storage
+sink.datasource.rate.limit=-1
+sink.datasource.rate.limit.type=guava
+sink.storage.loop=false
+
+# Sink
+sink.mode=retina
+sink.trans.mode=batch
+sink.commit.method=sync
+sink.commit.batch.size=50
+sink.commit.batch.worker=2
+sink.commit.batch.delay=50
+
+# Retina
+sink.retina.embedded=false
+sink.retina.mode=stream
+sink.retina.client=1
+sink.retina.log.queue=false
+sink.retina.rpc.limit=1000
+sink.retina.trans.limit=1000
+sink.retina.trans.request.batch=false
+sink.remote.host=localhost
+sink.remote.port=29422
+sink.timeout.ms=5000
+sink.flush.interval.ms=100
+sink.flush.batch.size=2
+sink.max.retries=3
+
+# Proto storage source
+sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
+sink.proto.data=ray-recovery-window-it-v2
+sink.proto.maxRecords=100000
+
+# Recovery
+sink.recovery.enable=true
+sink.recovery.mode=recovery
+sink.recovery.bootstrap.force_overwrite=false
+sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-window-rocksdb-v2
+sink.recovery.insert_as_update=true
+
+# Trino
+trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
+trino.user=pixels
+trino.password=
+trino.parallel=1
+
+# Freshness / monitor
+sink.monitor.enable=false
+sink.monitor.report.enable=false
+sink.monitor.freshness.level=row
+sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
+sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv
+
+# Other
+sink.rpc.enable=false
+sink.rpc.mock.delay=0
+sink.registry.url=http://localhost:8080/apis/registry/v2
+transaction.topic.suffix=transaction
+transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
diff --git a/conf/pixels-sink.it.recovery.properties b/conf/pixels-sink.it.recovery.properties
new file mode 100644
index 0000000..9359045
--- /dev/null
+++ b/conf/pixels-sink.it.recovery.properties
@@ -0,0 +1,60 @@
+# Source
+sink.datasource=storage
+sink.datasource.rate.limit=-1
+sink.datasource.rate.limit.type=guava
+sink.storage.loop=false
+
+# Sink
+sink.mode=retina
+sink.trans.mode=batch
+sink.commit.method=sync
+sink.commit.batch.size=50
+sink.commit.batch.worker=2
+sink.commit.batch.delay=50
+
+# Retina
+sink.retina.embedded=false
+sink.retina.mode=stream
+sink.retina.client=1
+sink.retina.log.queue=false
+sink.retina.rpc.limit=1000
+sink.retina.trans.limit=1000
+sink.retina.trans.request.batch=false
+sink.remote.host=localhost
+sink.remote.port=29422
+sink.timeout.ms=5000
+sink.flush.interval.ms=100
+sink.flush.batch.size=2
+sink.max.retries=3
+
+# Proto storage source
+sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
+sink.proto.data=ray-recovery-it
+sink.proto.maxRecords=100000
+
+# Recovery
+sink.recovery.enable=true
+sink.recovery.mode=recovery
+sink.recovery.bootstrap.force_overwrite=false
+sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-rocksdb
+sink.recovery.insert_as_update=true
+
+# Trino
+trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
+trino.user=pixels
+trino.password=
+trino.parallel=1
+
+# Freshness / monitor
+sink.monitor.enable=false
+sink.monitor.report.enable=false
+sink.monitor.freshness.level=row
+sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
+sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv
+
+# Other
+sink.rpc.enable=false
+sink.rpc.mock.delay=0
+sink.registry.url=http://localhost:8080/apis/registry/v2
+transaction.topic.suffix=transaction
+transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
diff --git a/docs/configuration.md b/docs/configuration.md
index 845af65..59a2bfc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -91,6 +91,45 @@ Notes on `sink.trans.mode`:
| `sink.proto.maxRecords` | `100000` | Max records per file. |
| `sink.storage.loop` | `false` | Whether to loop over stored files. |
+### Recovery
+
+Recovery is currently implemented for:
+
+- `sink.datasource=storage`
+- `sink.mode=retina`
+- `sink.trans.mode=batch`
+
+The current recovery path is designed around the `TableCrossTxWriter` write path.
+
+| Key | Default | Notes |
+| --- | --- | --- |
+| `sink.recovery.enable` | `false` | Enable local sink recovery metadata and recovery-aware processing. |
+| `sink.recovery.mode` | `bootstrap` | Startup mode: `bootstrap` or `recovery`. |
+| `sink.recovery.bootstrap.force_overwrite` | `false` | Only valid in `bootstrap` mode. Clears existing recovery state before starting from the beginning. |
+| `sink.recovery.dir` | `./sink-recovery` | Reserved top-level recovery directory. Current implementation mainly uses `sink.recovery.rocksdb.dir`. |
+| `sink.recovery.rocksdb.dir` | `./sink-recovery/rocksdb` | RocksDB directory for transaction bindings, active transaction order, and commit markers. |
+| `sink.recovery.insert_as_update` | `true` | In recovery mode, rewrite `INSERT` as `UPDATE` with `before = after` to support idempotent replay. |
+| `sink.recovery.fail_on_corruption` | `true` | Reserved for stricter recovery-store corruption handling. Not fully wired yet. |
+
+### Notes on recovery mode
+
+- `bootstrap` starts reading from the beginning of the storage source.
+- `recovery` loads recovery metadata and replays from the earliest active transaction `beginOffset`.
+- In `bootstrap` mode, if recovery state already exists and `sink.recovery.bootstrap.force_overwrite=false`, startup fails.
+- In `bootstrap` mode, if `sink.recovery.bootstrap.force_overwrite=true`, the existing recovery store is cleared before startup.
+- `sink.recovery.bootstrap.force_overwrite` must not be used with `sink.recovery.mode=recovery`.
+- Recovery currently assumes that `TransServer` can return the original transaction context via `getTransContext(pixelsTransId)`.
+
+### Notes on recovery metadata
+
+The recovery store keeps:
+
+- `dataSourceTxId -> pixelsTransId / timestamp / lease / beginOffset / lastSafeOffset / state`
+- an ordered view of active transactions by `beginOffset`
+- commit markers used to suppress duplicate commits
+
+`lastSafeOffset` is advanced on successful `TableCrossTxWriter` flush or batch completion, not on every row event.
+
### Flink Sink
| Key | Default | Notes |
diff --git a/docs/failover.md b/docs/failover.md
new file mode 100644
index 0000000..962a8b8
--- /dev/null
+++ b/docs/failover.md
@@ -0,0 +1,259 @@
+# Sink Failover and Recovery
+
+This document describes the current failover and local recovery mechanism implemented in Pixels Sink.
+
+## Scope
+
+The current implementation is intentionally narrow.
+
+Recovery is implemented for:
+
+- `sink.datasource=storage`
+- `sink.mode=retina`
+- `sink.trans.mode=batch`
+
+The primary write path covered by recovery is the `TableCrossTxWriter` path.
+
+## Goals
+
+The recovery design preserves the original mapping:
+
+- `DataSourceTxId -> Pixels TransID`
+- `DataSourceTxId -> Pixels TS`
+
+After a sink restart, the same source transaction should continue to use the same Pixels transaction identity and timestamp, instead of allocating a new transaction.
+
+The recovery design also needs a timestamp-based replay boundary so that sink recovery and Retina-side recovery can converge on the same replay window.
+
+## Key Assumptions
+
+- The storage source is replayable.
+- The transaction server can restore the original transaction context with `getTransContext(pixelsTransId)`.
+- The transaction server exposes a safe checkpoint boundary through `GetSafeGcTimestamp`.
+- Recovery replay keeps the original Pixels timestamp.
+- During recovery, `INSERT` can be rewritten to `UPDATE` with `before = after` so that replay is idempotent on the Retina side.
+
+## Recovery Metadata
+
+The sink stores recovery metadata in a local RocksDB database.
+
+It keeps four kinds of state:
+
+- transaction binding:
+ `dataSourceTxId -> pixelsTransId, timestamp, lease, beginOffset, lastSafeOffset, state`
+- active transaction order:
+ `beginOffset + dataSourceTxId -> dataSourceTxId`
+- commit marker:
+ `dataSourceTxId -> committed`
+- timestamp secondary index:
+ `pixelsTs -> dataSourceTxId / pixelsTransId / sourceOffset`
+
+The timestamp secondary index is required because the replay checkpoint is expressed in Pixels timestamp space, not only in source offset space.
+
+## Startup Modes
+
+Two startup modes are supported.
+
+### `bootstrap`
+
+`bootstrap` means:
+
+- start from the beginning of the storage source
+- do not continue unfinished recovery state
+
+If recovery state already exists:
+
+- startup fails by default
+- or the operator can force cleanup with:
+ `sink.recovery.bootstrap.force_overwrite=true`
+
+### `recovery`
+
+`recovery` means:
+
+- load active recovery metadata from RocksDB
+- query `GetSafeGcTimestamp` from `TransServer`
+- use the local `pixelsTs` secondary index to seek the corresponding replay position
+- rebuild the replay window from that checkpoint and from the earliest unfinished transaction state
+
+## Recovery State Machine
+
+The local recovery state uses four values:
+
+- `ACTIVE`
+- `COMMITTING`
+- `COMMITTED`
+- `ABORTED`
+
+
+```mermaid
+stateDiagram-v2
+ [*] --> ACTIVE : begin transaction and persist binding
+ ACTIVE --> COMMITTING : END received
+ COMMITTING --> COMMITTED : commit succeeds and marker is persisted
+ ACTIVE --> ABORTED : rollback or explicit abort
+ COMMITTING --> ABORTED : commit path fails and transaction is abandoned
+```
+
+## Data Flow
+
+The storage source now carries a physical replay offset through the main pipeline:
+
+- `fileId`
+- `byteOffset`
+- `epoch`
+- `recordType`
+
+This offset is used for replay positioning. The runtime synthetic transaction id such as `txid_loopId` is still kept, but it is not the recovery source of truth.
+
+The recovery path now depends on both:
+
+- physical source offset, for local replay positioning
+- Pixels timestamp, for checkpoint alignment with `TransServer` and Retina-side recovery
+
+```mermaid
+flowchart LR
+ A[Storage files] --> B[Storage source]
+ B --> C[StorageSourceRecord payload + offset]
+ C --> D[Transaction provider]
+ C --> E[Row provider]
+ D --> F[RecoveryManager records BEGIN offset]
+ E --> G[RowChangeEvent carries sourceOffset]
+ G --> H[RetinaWriter / SinkContextManager]
+ H --> I[TableCrossTxWriter flush]
+ I --> J[update lastSafeOffset]
+ H --> K[record pixelsTs secondary index]
+ H --> L[TransactionProxy commit]
+ L --> M[persist COMMITTED marker]
+
+## Replay Segments
+
+The recovery event stream should be treated as three logical segments.
+
+### Segment 1: checkpoint-complete
+
+This segment is fully replay-safe from the perspective of `TransServer`.
+
+Boundary:
+
+- determined by `GetSafeGcTimestamp`
+- resolved to a source seek position through the local `pixelsTs` secondary index
+
+Properties:
+
+- if sink itself is recovering, this segment is normally before the local active replay window
+- if a Retina instance crashes, it may still reset sink replay back into this segment
+- rows in this segment may be re-read and re-sent, so replay must still be idempotent
+- this segment is not "strictly skipped"; it is only the checkpoint-complete part of the stream
+
+### Segment 2: active replay window
+
+This segment contains transactions around the recovery boundary.
+
+Properties:
+
+- it includes both committed and uncommitted transactions
+- row replay is required for both committed and uncommitted transactions
+- committed transactions must not send commit again to `TransServer`
+- uncommitted transactions must continue on the original `pixelsTransId / pixelsTs`
+
+This is the main local recovery segment.
+
+### Segment 3: unread tail
+
+This segment has never been read by the current sink process.
+
+Properties:
+
+- it starts after the current replay/recovery boundary
+- normal source consumption should keep advancing the unread-tail offset
+- this segment is ordinary forward processing, not historical replay
+```
+
+## Write and Recovery Behavior
+
+### BEGIN
+
+When a source `BEGIN(dataSourceTxId)` is processed:
+
+- the sink first checks whether a binding already exists
+- if it exists, the sink restores the original Pixels transaction context
+- otherwise, the sink allocates a new Pixels transaction and persists the binding immediately
+
+### Row replay
+
+Each row keeps the original source transaction id and the original Pixels timestamp.
+
+During recovery:
+
+- rows may be replayed from the checkpoint-complete segment if Retina-side recovery resets sink to an older boundary
+- rows in the active replay window are replayed for both committed and uncommitted transactions
+- `INSERT` may be rewritten to `UPDATE`
+- `before` is copied from `after`
+
+At the source-read level, committed and unfinished transactions can both be replayed again.
+The difference is in commit handling:
+
+- committed transactions still replay row writes, but must not commit again
+- unfinished transactions replay row writes and continue on the original Pixels transaction context
+
+This replay model, together with stable `Pixels TS` and optional `INSERT -> UPDATE` rewriting, is the main mechanism used to tolerate duplicate row replay.
+
+### Flush safety
+
+`lastSafeOffset` is advanced only after a `TableCrossTxWriter` flush or batch succeeds.
+
+This means:
+
+- `lastSafeOffset` is only a per-transaction flush marker
+- it is not the main global replay checkpoint
+- it is not required to be globally monotonic
+- a crash may still replay some already-written rows
+- replay correctness relies on idempotent semantics
+- the sink does not assume that every processed row is immediately durable
+
+### END and commit
+
+When `END(dataSourceTxId)` arrives:
+
+1. the local state changes to `COMMITTING`
+2. the sink sends the commit to `TransServer`
+3. after commit succeeds, the sink persists:
+ - `state = COMMITTED`
+ - commit marker
+ - removal from the active transaction order index
+
+If the sink crashes after commit succeeds but before the local marker is written, replay may still attempt the commit again. This is a known boundary and is one reason the commit marker is part of the durable recovery state.
+
+## Replay Strategy
+
+On recovery startup:
+
+1. open RocksDB
+2. query `GetSafeGcTimestamp` from `TransServer`
+3. use the local `pixelsTs` secondary index to seek the replay checkpoint in the source stream
+4. load unfinished transactions and find the earliest required replay window
+5. rebuild the three logical segments:
+ - checkpoint-complete
+ - active replay window
+ - unread tail
+6. replay rows for committed and unfinished transactions in the active replay window
+7. suppress duplicate commit for transactions that already have a commit marker
+8. continue unfinished transactions using the original Pixels transaction context
+
+## Known Limitations
+
+- Recovery is currently scoped to `storage + retina + batch`.
+- The implementation is centered on the `TableCrossTxWriter` path.
+- `ABORTED` exists in the state model but is not yet a full recovery path.
+- `pixelsTs -> replay position` secondary indexing is required for the final checkpoint-based design.
+- `lastSafeOffset` is persisted, but it is not sufficient to define the global replay checkpoint by itself.
+- `sink.recovery.fail_on_corruption` exists as a config key, but corruption handling is not fully wired yet.
+- `sink.recovery.dir` is currently reserved; the active implementation mainly uses `sink.recovery.rocksdb.dir`.
+
+## Recommended Operational Use
+
+- Use `bootstrap` for fresh runs or intentional full replays.
+- Use `recovery` only after an unclean shutdown that should continue existing transaction bindings.
+- Do not enable `sink.recovery.bootstrap.force_overwrite` unless you explicitly want to discard previous unfinished recovery state.
+- Keep the recovery RocksDB on stable local storage, not on temporary storage.
diff --git a/pom.xml b/pom.xml
index 2b965f2..6fb2b83 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,11 +59,6 @@
true
test
-
- org.apache.hive
- hive-jdbc
- 2.3.9
-
io.etcd
jetcd-core
@@ -155,6 +150,11 @@
guava
33.2.0-jre
+
+ org.rocksdb
+ rocksdbjni
+ ${dep.rocksdb.version}
+
org.projectlombok
@@ -333,6 +333,13 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
-
\ No newline at end of file
+
diff --git a/src/main/java/io/pixelsdb/pixels/sink/PixelsSinkApp.java b/src/main/java/io/pixelsdb/pixels/sink/PixelsSinkApp.java
index 0453d6a..84ce5b1 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/PixelsSinkApp.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/PixelsSinkApp.java
@@ -31,6 +31,7 @@
import io.pixelsdb.pixels.sink.writer.PixelsSinkWriterFactory;
import io.pixelsdb.pixels.sink.writer.retina.SinkContextManager;
import io.pixelsdb.pixels.sink.writer.retina.TransactionProxy;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import org.slf4j.Logger;
@@ -76,6 +77,7 @@ public static void main(String[] args) throws IOException
throw new RuntimeException(e);
}
}
+ RecoveryManager.closeInstance();
}));
diff --git a/src/main/java/io/pixelsdb/pixels/sink/config/PixelsSinkConfig.java b/src/main/java/io/pixelsdb/pixels/sink/config/PixelsSinkConfig.java
index 4f75eeb..170c64f 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/config/PixelsSinkConfig.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/config/PixelsSinkConfig.java
@@ -196,6 +196,27 @@ public class PixelsSinkConfig
@ConfigKey(value = "sink.storage.loop", defaultValue = "false")
private boolean sinkStorageLoop;
+ @ConfigKey(value = "sink.recovery.enable", defaultValue = "false")
+ private boolean recoveryEnabled;
+
+ @ConfigKey(value = "sink.recovery.mode", defaultValue = "bootstrap")
+ private String recoveryMode;
+
+ @ConfigKey(value = "sink.recovery.bootstrap.force_overwrite", defaultValue = "false")
+ private boolean recoveryBootstrapForceOverwrite;
+
+ @ConfigKey(value = "sink.recovery.dir", defaultValue = "./sink-recovery")
+ private String recoveryDir;
+
+ @ConfigKey(value = "sink.recovery.rocksdb.dir", defaultValue = "./sink-recovery/rocksdb")
+ private String recoveryRocksdbDir;
+
+ @ConfigKey(value = "sink.recovery.insert_as_update", defaultValue = "true")
+ private boolean recoveryInsertAsUpdate;
+
+ @ConfigKey(value = "sink.recovery.fail_on_corruption", defaultValue = "true")
+ private boolean recoveryFailOnCorruption;
+
@ConfigKey(value = "sink.monitor.freshness.level", defaultValue = "row") // row or txn or embed
private String sinkMonitorFreshnessLevel;
@ConfigKey(value = "sink.monitor.freshness.embed.warmup", defaultValue = "10")
@@ -255,4 +276,4 @@ private void init()
this.enableSourceRateLimit = this.sourceRateLimit >= 0;
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/config/factory/PixelsSinkConfigFactory.java b/src/main/java/io/pixelsdb/pixels/sink/config/factory/PixelsSinkConfigFactory.java
index 56b3550..cecbbde 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/config/factory/PixelsSinkConfigFactory.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/config/factory/PixelsSinkConfigFactory.java
@@ -22,6 +22,7 @@
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import java.io.IOException;
@@ -63,6 +64,7 @@ public static PixelsSinkConfig getInstance()
public static synchronized void reset()
{
+ RecoveryManager.closeInstance();
instance = null;
configFilePath = null;
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/event/RowChangeEvent.java b/src/main/java/io/pixelsdb/pixels/sink/event/RowChangeEvent.java
index b3588fd..c24eb79 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/event/RowChangeEvent.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/event/RowChangeEvent.java
@@ -30,6 +30,7 @@
import io.pixelsdb.pixels.sink.exception.SinkException;
import io.pixelsdb.pixels.sink.metadata.TableMetadata;
import io.pixelsdb.pixels.sink.metadata.TableMetadataRegistry;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
import io.pixelsdb.pixels.sink.util.MetricsFacade;
import io.prometheus.client.Summary;
import lombok.Getter;
@@ -75,6 +76,9 @@ public class RowChangeEvent
@Getter
private SchemaTableName schemaTableName;
+ @Setter
+ @Getter
+ private StorageSourceOffset sourceOffset;
public RowChangeEvent(SinkProto.RowRecord rowRecord) throws SinkException
{
diff --git a/src/main/java/io/pixelsdb/pixels/sink/provider/TableEventStorageLoopProvider.java b/src/main/java/io/pixelsdb/pixels/sink/provider/TableEventStorageLoopProvider.java
index b628876..1bac4b1 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/provider/TableEventStorageLoopProvider.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/provider/TableEventStorageLoopProvider.java
@@ -21,14 +21,15 @@
package io.pixelsdb.pixels.sink.provider;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.SinkProto;
import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
import io.pixelsdb.pixels.sink.event.RowChangeEvent;
import io.pixelsdb.pixels.sink.event.deserializer.RowChangeEventStructDeserializer;
import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceRecord;
import io.pixelsdb.pixels.sink.util.DataTransform;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
@@ -57,10 +58,9 @@ protected TableEventStorageLoopProvider()
@Override
RowChangeEvent convertToTargetRecord(T record)
{
- Pair pairRecord = (Pair) record;
- ByteBuffer sourceRecord = pairRecord.getLeft();
+ StorageSourceRecord sourceStorageRecord = (StorageSourceRecord) record;
+ ByteBuffer sourceRecord = sourceStorageRecord.getPayload().duplicate();
sourceRecord.rewind();
- Integer loopId = pairRecord.getRight();
try
{
SinkProto.RowRecord rowRecord = SinkProto.RowRecord.parseFrom(sourceRecord);
@@ -78,9 +78,12 @@ RowChangeEvent convertToTargetRecord(T record)
SinkProto.TransactionInfo.Builder transactionBuilder = rowRecordBuilder.getTransactionBuilder();
String id = transactionBuilder.getId();
- transactionBuilder.setId(id + "_" + loopId);
+ transactionBuilder.setId(id + "_" + sourceStorageRecord.getOffset().getEpoch());
rowRecordBuilder.setTransaction(transactionBuilder);
- return RowChangeEventStructDeserializer.convertToRowChangeEvent(rowRecordBuilder.build());
+ RowChangeEvent event = RowChangeEventStructDeserializer.convertToRowChangeEvent(rowRecordBuilder.build());
+ event.setSourceOffset(sourceStorageRecord.getOffset());
+ RecoveryManager.getInstance().observeRowEvent(event);
+ return event;
} catch (InvalidProtocolBufferException | SinkException e)
{
LOGGER.warning(e.getMessage());
diff --git a/src/main/java/io/pixelsdb/pixels/sink/provider/TableProviderAndProcessorPipelineManager.java b/src/main/java/io/pixelsdb/pixels/sink/provider/TableProviderAndProcessorPipelineManager.java
index 128f70b..01c6169 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/provider/TableProviderAndProcessorPipelineManager.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/provider/TableProviderAndProcessorPipelineManager.java
@@ -22,8 +22,8 @@
import io.pixelsdb.pixels.common.metadata.SchemaTableName;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.processor.TableProcessor;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceRecord;
import org.apache.kafka.connect.source.SourceRecord;
import java.nio.ByteBuffer;
@@ -68,7 +68,7 @@ public void routeRecord(Integer tableId, SOURCE_RECORD_T record)
private TableEventProvider createProvider(SOURCE_RECORD_T record)
{
Class> recordType = record.getClass();
- if (recordType == Pair.class)
+ if (recordType == StorageSourceRecord.class)
{
return new TableEventStorageLoopProvider<>();
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/provider/TransactionEventStorageLoopProvider.java b/src/main/java/io/pixelsdb/pixels/sink/provider/TransactionEventStorageLoopProvider.java
index 956de39..8d0e0c3 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/provider/TransactionEventStorageLoopProvider.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/provider/TransactionEventStorageLoopProvider.java
@@ -21,8 +21,9 @@
package io.pixelsdb.pixels.sink.provider;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceRecord;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import java.nio.ByteBuffer;
@@ -31,14 +32,17 @@ public class TransactionEventStorageLoopProvider extends TransactionEventProv
@Override
SinkProto.TransactionMetadata convertToTargetRecord(T record)
{
- Pair buffer = (Pair) record;
+ StorageSourceRecord buffer = (StorageSourceRecord) record;
try
{
- SinkProto.TransactionMetadata tx = SinkProto.TransactionMetadata.parseFrom(buffer.getLeft());
- Integer loopId = buffer.getRight();
+ ByteBuffer payload = buffer.getPayload().duplicate();
+ payload.rewind();
+ SinkProto.TransactionMetadata tx = SinkProto.TransactionMetadata.parseFrom(payload);
SinkProto.TransactionMetadata.Builder builder = tx.toBuilder();
- builder.setId(builder.getId() + "_" + loopId);
- return builder.build();
+ builder.setId(builder.getId() + "_" + buffer.getOffset().getEpoch());
+ SinkProto.TransactionMetadata transactionMetadata = builder.build();
+ RecoveryManager.getInstance().observeTransactionMetadata(transactionMetadata, buffer.getOffset());
+ return transactionMetadata;
} catch (InvalidProtocolBufferException e)
{
throw new RuntimeException(e);
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractMemorySinkStorageSource.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractMemorySinkStorageSource.java
index d3e0f7c..57a422f 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractMemorySinkStorageSource.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractMemorySinkStorageSource.java
@@ -23,7 +23,6 @@
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.config.PixelsSinkConstants;
import io.pixelsdb.pixels.sink.provider.ProtoType;
import org.slf4j.Logger;
@@ -44,11 +43,17 @@ public abstract class AbstractMemorySinkStorageSource extends AbstractSinkStorag
// All preloaded records, order preserved
// key + value buffer
- private final List> preloadedRecords = new ArrayList<>();
+ private final List> preloadedRecords = new ArrayList<>();
@Override
public void start()
{
+ recoveryManager.initializeForSourceStart();
+ StorageSourceOffset replayStartOffset = recoveryManager.getReplayStartOffset();
+ if (replayStartOffset != null)
+ {
+ this.loopId = replayStartOffset.getEpoch();
+ }
this.running.set(true);
this.transactionProcessorThread.start();
this.transactionProviderThread.start();
@@ -57,21 +62,25 @@ public void start()
/* =====================================================
* 1. Initialization phase: preload all ByteBuffers
* ===================================================== */
- for (String file : files)
+ for (int fileId = 0; fileId < files.size(); fileId++)
{
+ String file = files.get(fileId);
Storage.Scheme scheme = Storage.Scheme.fromPath(file);
LOGGER.info("Preloading file {}", file);
PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(scheme, file);
readers.add(reader);
+ long offset = 0;
while (true)
{
int key;
int valueLen;
+ long recordOffset;
try
{
+ recordOffset = offset;
key = reader.readInt(ByteOrder.BIG_ENDIAN);
valueLen = reader.readInt(ByteOrder.BIG_ENDIAN);
} catch (IOException eof)
@@ -81,11 +90,17 @@ public void start()
}
// Synchronous read and copy to heap buffer
ByteBuffer valueBuffer = reader.readFully(valueLen);
- // Store into a single global array
- ByteBuffer cleanBuffer = valueBuffer.duplicate();
- cleanBuffer.rewind();
- cleanBuffer.limit(cleanBuffer.position() + valueLen);
- preloadedRecords.add(new Pair<>(key, cleanBuffer));
+ ByteBuffer duplicateValueBuffer = valueBuffer.duplicate();
+ duplicateValueBuffer.rewind();
+ ByteBuffer cleanBuffer = ByteBuffer.allocate(valueLen);
+ cleanBuffer.put(duplicateValueBuffer);
+ cleanBuffer.flip();
+ offset += Integer.BYTES * 2L + valueLen;
+ preloadedRecords.add(new StorageSourceRecord<>(
+ key,
+ cleanBuffer,
+ new StorageSourceOffset(fileId, recordOffset, 0, getProtoType(key))
+ ));
}
}
@@ -98,15 +113,31 @@ public void start()
* ===================================================== */
do
{
- for (Pair record : preloadedRecords)
+ for (StorageSourceRecord record : preloadedRecords)
{
- int key = record.getLeft();
- ByteBuffer src = record.getRight();
- ByteBuffer copy = ByteBuffer.allocate(src.remaining());
- copy.put(src.duplicate().rewind());
+ ByteBuffer src = record.getPayload();
+ StorageSourceOffset offset = record.getOffset();
+ if (replayStartOffset != null && offset.getEpoch() == 0)
+ {
+ StorageSourceOffset currentLoopOffset = new StorageSourceOffset(
+ offset.getFileId(),
+ offset.getByteOffset(),
+ loopId,
+ offset.getRecordType()
+ );
+ if (currentLoopOffset.compareTo(replayStartOffset) < 0)
+ {
+ continue;
+ }
+ }
+ int key = record.getSourceKey();
+ ByteBuffer duplicate = src.duplicate();
+ duplicate.rewind();
+ ByteBuffer copy = ByteBuffer.allocate(duplicate.remaining());
+ copy.put(duplicate);
copy.flip();
// Lazily create queue
- BlockingQueue, Integer>> queue =
+ BlockingQueue>> queue =
queueMap.computeIfAbsent(
key,
k -> new LinkedBlockingQueue<>(PixelsSinkConstants.MAX_QUEUE_SIZE)
@@ -129,10 +160,14 @@ public void start()
}
// Use completed future to keep consumer logic unchanged
- CompletableFuture future =
- CompletableFuture.completedFuture(copy);
-
- queue.put(new Pair<>(future, loopId));
+ CompletableFuture future = CompletableFuture.completedFuture(copy);
+ StorageSourceOffset currentOffset = new StorageSourceOffset(
+ offset.getFileId(),
+ offset.getByteOffset(),
+ loopId,
+ offset.getRecordType()
+ );
+ queue.put(new StorageSourceRecord<>(key, future, currentOffset));
}
++loopId;
} while (storageLoopEnabled && isRunning());
@@ -147,4 +182,5 @@ public void start()
clean();
}
}
+
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractReaderSinkStorageSource.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractReaderSinkStorageSource.java
index 8be58ea..630c3a6 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractReaderSinkStorageSource.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractReaderSinkStorageSource.java
@@ -23,7 +23,6 @@
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.config.PixelsSinkConstants;
import io.pixelsdb.pixels.sink.provider.ProtoType;
import org.slf4j.Logger;
@@ -43,11 +42,18 @@ public abstract class AbstractReaderSinkStorageSource extends AbstractSinkStorag
@Override
public void start()
{
+ recoveryManager.initializeForSourceStart();
+ StorageSourceOffset replayStartOffset = recoveryManager.getReplayStartOffset();
+ if (replayStartOffset != null)
+ {
+ this.loopId = replayStartOffset.getEpoch();
+ }
this.running.set(true);
this.transactionProcessorThread.start();
this.transactionProviderThread.start();
- for (String file : files)
+ for (int fileId = 0; fileId < files.size(); fileId++)
{
+ String file = files.get(fileId);
Storage.Scheme scheme = Storage.Scheme.fromPath(file);
LOGGER.info("Start read from file {}", file);
PhysicalReader reader;
@@ -60,65 +66,76 @@ public void start()
}
readers.add(reader);
}
- do
+ try
{
- for (PhysicalReader reader : readers)
+ do
{
- LOGGER.info("Start Read {}", reader.getPath());
- long offset = 0;
- while (true)
+ for (int fileId = 0; fileId < readers.size(); fileId++)
{
- try
+ PhysicalReader reader = readers.get(fileId);
+ LOGGER.info("Start Read {}", reader.getPath());
+ long offset = 0;
+ while (true)
{
- int key, valueLen;
- reader.seek(offset);
try
{
- key = reader.readInt(ByteOrder.BIG_ENDIAN);
- valueLen = reader.readInt(ByteOrder.BIG_ENDIAN);
- } catch (IOException e)
- {
- // EOF
- break;
- }
-
- ProtoType protoType = getProtoType(key);
- offset += Integer.BYTES * 2;
- CompletableFuture valueFuture = reader.readAsync(offset, valueLen)
- .thenApply(this::copyToHeap)
- .thenApply(buf -> buf.order(ByteOrder.BIG_ENDIAN));
- // move offset for next record
- offset += valueLen;
-
+ int key;
+ int valueLen;
+ long recordOffset = offset;
+ reader.seek(offset);
+ try
+ {
+ key = reader.readInt(ByteOrder.BIG_ENDIAN);
+ valueLen = reader.readInt(ByteOrder.BIG_ENDIAN);
+ } catch (IOException e)
+ {
+ break;
+ }
- // Get or create queue
- BlockingQueue, Integer>> queue =
- queueMap.computeIfAbsent(key,
- k -> new LinkedBlockingQueue<>(PixelsSinkConstants.MAX_QUEUE_SIZE));
+ ProtoType protoType = getProtoType(key);
+ StorageSourceOffset currentOffset = new StorageSourceOffset(fileId, recordOffset, loopId, protoType);
+ offset += Integer.BYTES * 2L;
+ CompletableFuture valueFuture = reader.readAsync(offset, valueLen)
+ .thenApply(this::copyToHeap)
+ .thenApply(buf -> buf.order(ByteOrder.BIG_ENDIAN));
+ offset += valueLen;
+ if (replayStartOffset != null && currentOffset.compareTo(replayStartOffset) < 0)
+ {
+ continue;
+ }
- // Put future in queue
- if (protoType.equals(ProtoType.ROW))
+ BlockingQueue>> queue =
+ queueMap.computeIfAbsent(key,
+ k -> new LinkedBlockingQueue<>(PixelsSinkConstants.MAX_QUEUE_SIZE));
+ if (protoType.equals(ProtoType.ROW))
+ {
+ sourceRateLimiter.acquire(1);
+ }
+ queue.put(new StorageSourceRecord<>(key, valueFuture, currentOffset));
+ consumerThreads.computeIfAbsent(key, k ->
+ {
+ Thread t = new Thread(() -> consumeQueue(k, queue, protoType));
+ t.setName("consumer-" + key);
+ t.start();
+ return t;
+ });
+ } catch (InterruptedException e)
{
- sourceRateLimiter.acquire(1);
- }
- queue.put(new Pair<>(valueFuture, loopId));
- // Start consumer thread if not exists
- consumerThreads.computeIfAbsent(key, k ->
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException e)
{
- Thread t = new Thread(() -> consumeQueue(k, queue, protoType));
- t.setName("consumer-" + key);
- t.start();
- return t;
- });
- } catch (IOException | InterruptedException e)
- {
- break;
+ LOGGER.warn("Failed to read source file {}", reader.getPath(), e);
+ break;
+ }
}
}
+ ++loopId;
}
- ++loopId;
- } while (storageLoopEnabled && isRunning());
-
- clean();
+ while (storageLoopEnabled && isRunning());
+ } finally
+ {
+ clean();
+ }
}
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractSinkStorageSource.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractSinkStorageSource.java
index f65ae60..26aa667 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractSinkStorageSource.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/AbstractSinkStorageSource.java
@@ -21,7 +21,6 @@
package io.pixelsdb.pixels.sink.source.storage;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
-import io.pixelsdb.pixels.core.utils.Pair;
import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
import io.pixelsdb.pixels.sink.metadata.TableMetadataRegistry;
@@ -34,6 +33,7 @@
import io.pixelsdb.pixels.sink.util.MetricsFacade;
import io.pixelsdb.pixels.sink.util.rateLimiter.FlushRateLimiter;
import io.pixelsdb.pixels.sink.util.rateLimiter.FlushRateLimiterFactory;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,18 +59,19 @@ public abstract class AbstractSinkStorageSource implements SinkSource
protected final List files;
protected final CompletableFuture POISON_PILL = new CompletableFuture<>();
protected final Map consumerThreads = new ConcurrentHashMap<>();
- protected final Map, Integer>>> queueMap = new ConcurrentHashMap<>();
+ protected final Map>>> queueMap = new ConcurrentHashMap<>();
protected final boolean storageLoopEnabled;
protected final FlushRateLimiter sourceRateLimiter;
private final TableMetadataRegistry tableMetadataRegistry = TableMetadataRegistry.Instance();
private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
- private final TableProviderAndProcessorPipelineManager> tablePipelineManager = new TableProviderAndProcessorPipelineManager<>();
- protected TransactionEventStorageLoopProvider> transactionEventProvider;
+ private final TableProviderAndProcessorPipelineManager> tablePipelineManager = new TableProviderAndProcessorPipelineManager<>();
+ protected TransactionEventStorageLoopProvider> transactionEventProvider;
protected TransactionProcessor transactionProcessor;
protected Thread transactionProviderThread;
protected Thread transactionProcessorThread;
protected int loopId = 0;
protected List readers = new ArrayList<>();
+ protected final RecoveryManager recoveryManager;
protected AbstractSinkStorageSource()
{
@@ -87,6 +88,7 @@ protected AbstractSinkStorageSource()
this.transactionProcessor = new TransactionProcessor(transactionEventProvider);
this.transactionProcessorThread = new Thread(transactionProcessor, "debezium-processor");
this.sourceRateLimiter = FlushRateLimiterFactory.getNewInstance();
+ this.recoveryManager = RecoveryManager.getInstance();
}
abstract ProtoType getProtoType(int i);
@@ -97,7 +99,7 @@ protected void clean()
{
try
{
- q.put(new Pair<>(POISON_PILL, loopId));
+ q.put(new StorageSourceRecord<>(-1, POISON_PILL, new StorageSourceOffset(-1, -1, loopId, ProtoType.ROW)));
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
@@ -127,30 +129,30 @@ protected void clean()
}
}
- protected void handleTransactionSourceRecord(ByteBuffer record, Integer loopId)
+ protected void handleTransactionSourceRecord(StorageSourceRecord record)
{
- transactionEventProvider.putTransRawEvent(new Pair<>(record, loopId));
+ transactionEventProvider.putTransRawEvent(record);
}
- protected void consumeQueue(int key, BlockingQueue, Integer>> queue, ProtoType protoType)
+ protected void consumeQueue(int key, BlockingQueue>> queue, ProtoType protoType)
{
try
{
while (true)
{
- Pair, Integer> pair = queue.take();
- CompletableFuture value = pair.getLeft();
- int loopId = pair.getRight();
+ StorageSourceRecord> record = queue.take();
+ CompletableFuture value = record.getPayload();
if (value == POISON_PILL)
{
break;
}
ByteBuffer valueBuffer = value.get();
+ StorageSourceRecord decodedRecord = new StorageSourceRecord<>(record.getSourceKey(), valueBuffer, record.getOffset());
metricsFacade.recordDebeziumEvent();
switch (protoType)
{
- case ROW -> handleRowChangeSourceRecord(key, valueBuffer, loopId);
- case TRANS -> handleTransactionSourceRecord(valueBuffer, loopId);
+ case ROW -> handleRowChangeSourceRecord(record.getSourceKey(), decodedRecord);
+ case TRANS -> handleTransactionSourceRecord(decodedRecord);
}
}
} catch (InterruptedException e)
@@ -171,9 +173,9 @@ protected ByteBuffer copyToHeap(ByteBuffer directBuffer)
return heapBuffer;
}
- protected void handleRowChangeSourceRecord(int key, ByteBuffer dataBuffer, int loopId)
+ protected void handleRowChangeSourceRecord(int key, StorageSourceRecord record)
{
- tablePipelineManager.routeRecord(key, new Pair<>(dataBuffer, loopId));
+ tablePipelineManager.routeRecord(key, record);
}
@Override
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/LegacySinkStorageSource.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/LegacySinkStorageSource.java
index a8d636b..f222f36 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/source/storage/LegacySinkStorageSource.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/LegacySinkStorageSource.java
@@ -187,8 +187,8 @@ private void consumeQueue(SchemaTableName key, BlockingQueue handleRowChangeSourceRecord(0, valueBuffer, 0);
- case TRANS -> handleTransactionSourceRecord(valueBuffer, 0);
+ case ROW -> handleRowChangeSourceRecord(key, valueBuffer);
+ case TRANS -> transactionEventProvider.putTransRawEvent(valueBuffer);
}
}
} catch (InterruptedException e)
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffset.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffset.java
new file mode 100644
index 0000000..ab2bb07
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffset.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.source.storage;
+
+import io.pixelsdb.pixels.sink.provider.ProtoType;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Getter
+@ToString
+@EqualsAndHashCode
+public class StorageSourceOffset implements Comparable, Serializable
+{
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private final int fileId;
+ private final long byteOffset;
+ private final int epoch;
+ private final ProtoType recordType;
+
+ public StorageSourceOffset(int fileId, long byteOffset, int epoch, ProtoType recordType)
+ {
+ this.fileId = fileId;
+ this.byteOffset = byteOffset;
+ this.epoch = epoch;
+ this.recordType = recordType;
+ }
+
+ @Override
+ public int compareTo(StorageSourceOffset other)
+ {
+ int epochComparison = Integer.compare(this.epoch, other.epoch);
+ if (epochComparison != 0)
+ {
+ return epochComparison;
+ }
+ int fileComparison = Integer.compare(this.fileId, other.fileId);
+ if (fileComparison != 0)
+ {
+ return fileComparison;
+ }
+ return Long.compare(this.byteOffset, other.byteOffset);
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceRecord.java b/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceRecord.java
new file mode 100644
index 0000000..c8d7b42
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceRecord.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.source.storage;
+
+import lombok.Getter;
+
+@Getter
+public class StorageSourceRecord
+{
+ private final int sourceKey;
+ private final T payload;
+ private final StorageSourceOffset offset;
+
+ public StorageSourceRecord(int sourceKey, T payload, StorageSourceOffset offset)
+ {
+ this.sourceKey = sourceKey;
+ this.payload = payload;
+ this.offset = offset;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java b/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java
index 92b656e..c7eaccc 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/util/DataTransform.java
@@ -23,6 +23,8 @@
import com.google.protobuf.ByteString;
import io.pixelsdb.pixels.retina.RetinaProto;
import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -180,4 +182,24 @@ public static String extractTableName(String topic)
String[] parts = topic.split("\\.");
return parts[parts.length - 1];
}
+
+ public static RowChangeEvent transformInsertToRecoveryUpdate(RowChangeEvent event) throws SinkException
+ {
+ if (event == null || !event.isInsert())
+ {
+ return event;
+ }
+ SinkProto.RowRecord.Builder builder = event.getRowRecord().toBuilder();
+ if (!builder.hasAfter())
+ {
+ return event;
+ }
+ builder.setOp(SinkProto.OperationType.UPDATE);
+ builder.setBefore(builder.getAfter());
+ RowChangeEvent transformed = new RowChangeEvent(builder.build(), event.getSchema());
+ transformed.setTimeStamp(event.getTimeStamp());
+ transformed.setSourceOffset(event.getSourceOffset());
+ transformed.initIndexKey();
+ return transformed;
+ }
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java
index eaf4f3b..1e059b4 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/RetinaWriter.java
@@ -27,6 +27,7 @@
import io.pixelsdb.pixels.sink.exception.SinkException;
import io.pixelsdb.pixels.sink.util.MetricsFacade;
import io.pixelsdb.pixels.sink.writer.PixelsSinkWriter;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +47,14 @@ public class RetinaWriter implements PixelsSinkWriter
private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
private final SinkContextManager sinkContextManager;
private final TransactionMode transactionMode;
+ private final RecoveryManager recoveryManager;
public RetinaWriter()
{
PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
this.sinkContextManager = SinkContextManager.getInstance();
this.transactionMode = config.getTransactionMode();
+ this.recoveryManager = RecoveryManager.getInstance();
}
@Override
@@ -88,6 +91,10 @@ public boolean writeRow(RowChangeEvent event)
{
return false;
}
+ if (!recoveryManager.shouldReplayRow(event))
+ {
+ return true;
+ }
metricsFacade.recordRowChange(event.getTable(), event.getOp());
event.startLatencyTimer();
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java
index e74829e..c9d8eb4 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/SinkContextManager.java
@@ -29,6 +29,7 @@
import io.pixelsdb.pixels.sink.freshness.FreshnessClient;
import io.pixelsdb.pixels.sink.util.BlockingBoundedMap;
import io.pixelsdb.pixels.sink.util.DataTransform;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class SinkContextManager
private final CommitMethod commitMethod;
private final String freshnessLevel;
private final RetinaBucketDispatcher retinaBucketDispatcher;
+ private final RecoveryManager recoveryManager;
private SinkContextManager()
{
@@ -64,6 +66,7 @@ private SinkContextManager()
this.freshnessLevel = config.getSinkMonitorFreshnessLevel();
this.retinaBucketDispatcher = new RetinaBucketDispatcher();
this.activeTxContexts = new BlockingBoundedMap<>(config.getRetinaTransLimit());
+ this.recoveryManager = RecoveryManager.getInstance();
}
public static SinkContextManager getInstance()
@@ -91,6 +94,7 @@ protected SinkContext getActiveTxContext(RowChangeEvent event, AtomicBoolean can
LOGGER.trace("Allocate new tx {}\torder:{}", sourceTxId, event.getTransaction().getTotalOrder());
SinkContext newSinkContext = new SinkContext(sourceTxId);
newSinkContext.bufferOrphanedEvent(event);
+ recoveryManager.observeRowEvent(event);
return newSinkContext;
} else
{
@@ -120,7 +124,9 @@ protected SinkContext getActiveTxContext(RowChangeEvent event, AtomicBoolean can
protected void startTransSync(String sourceTxId)
{
LOGGER.trace("Start trans {}", sourceTxId);
- TransContext pixelsTransContext = transactionProxy.getNewTransContext(sourceTxId);
+ TransContext pixelsTransContext = recoveryManager.restoreTransContext(sourceTxId, transactionProxy::getExistingTransContext)
+ .orElseGet(() -> transactionProxy.getNewTransContext(sourceTxId));
+ recoveryManager.recordNewBinding(sourceTxId, pixelsTransContext);
activeTxContexts.compute(
sourceTxId,
(k, oldCtx) ->
@@ -170,6 +176,10 @@ void processTxCommit(SinkProto.TransactionMetadata txEnd)
{
ctx.tableCounterLock.lock();
ctx.setEndTx(txEnd);
+ if (recoveryManager.shouldTrackCommitProgress(txId))
+ {
+ recoveryManager.markCommitting(txId);
+ }
long startTs = System.currentTimeMillis();
if (ctx.isCompleted())
{
@@ -185,6 +195,11 @@ void endTransaction(SinkContext ctx)
{
String txId = ctx.getSourceTxId();
removeSinkContext(txId);
+ if (recoveryManager.shouldSuppressCommit(txId))
+ {
+ LOGGER.trace("Skip duplicate commit for recovered transaction {}", txId);
+ return;
+ }
boolean failed = ctx.isFailed();
if (!failed)
{
@@ -244,6 +259,10 @@ protected void writeRowChangeEvent(SinkContext ctx, RowChangeEvent event) throws
{
event.setTimeStamp(ctx.getTimestamp());
}
+ if (ctx != null && recoveryManager.shouldRewriteInsert(ctx.getSourceTxId()))
+ {
+ event = DataTransform.transformInsertToRecoveryUpdate(event);
+ }
retinaBucketDispatcher.writeRowChangeEvent(event, ctx);
}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java
index c73864b..d6f8cd4 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TableCrossTxWriter.java
@@ -24,14 +24,18 @@
import io.pixelsdb.pixels.retina.RetinaProto;
import io.pixelsdb.pixels.sink.event.RowChangeEvent;
import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
@@ -48,12 +52,14 @@ public class TableCrossTxWriter extends TableWriter
private final Logger LOGGER = LoggerFactory.getLogger(TableCrossTxWriter.class);
private final int flushBatchSize;
private final InFlightControlManager inFlightControlManager;
+ private final RecoveryManager recoveryManager;
public TableCrossTxWriter(String t, int bucketId)
{
super(t, bucketId);
flushBatchSize = config.getFlushBatchSize();
inFlightControlManager = InFlightControlManager.getInstance();
+ recoveryManager = RecoveryManager.getInstance();
}
/**
@@ -68,11 +74,13 @@ public void flush(List batch)
List smallBatch = null;
List txIds = new ArrayList<>();
List fullTableName = new ArrayList<>();
+ Map safeOffsets = new HashMap<>();
List tableUpdateDataBuilderList = new LinkedList<>();
List tableUpdateCount = new ArrayList<>();
for (RowChangeEvent event : batch)
{
String currTxId = event.getTransaction().getId();
+ updateSafeOffset(safeOffsets, currTxId, event.getSourceOffset());
if (!currTxId.equals(txId))
{
if (smallBatch != null && !smallBatch.isEmpty())
@@ -98,7 +106,7 @@ public void flush(List batch)
RetinaProto.TableUpdateData.Builder builder = buildTableUpdateDataFromBatch(txId, smallBatch);
if (builder != null)
{
- tableUpdateDataBuilderList.add(buildTableUpdateDataFromBatch(txId, smallBatch));
+ tableUpdateDataBuilderList.add(builder);
tableUpdateCount.add(smallBatch.size());
}
}
@@ -137,6 +145,7 @@ public void flush(List batch)
{
metricsFacade.recordFreshness(txEndTime - txStartTime);
}
+ safeOffsets.forEach(recoveryManager::advanceLastSafeOffset);
updateCtxCounters(txIds, fullTableName, tableUpdateCount);
}
}
@@ -167,6 +176,10 @@ private void updateCtxCounters(List txIds, List fullTableName, L
metricsFacade.recordRowEvent(tableUpdateCount.get(i));
String writeTxId = txIds.get(i);
SinkContext sinkContext = SinkContextManager.getInstance().getSinkContext(writeTxId);
+ if (sinkContext == null)
+ {
+ continue;
+ }
try
{
@@ -226,6 +239,16 @@ protected RetinaProto.TableUpdateData.Builder buildTableUpdateDataFromBatch(Stri
return builder;
}
+ private void updateSafeOffset(Map safeOffsets, String txId, StorageSourceOffset offset)
+ {
+ if (offset == null)
+ {
+ return;
+ }
+ safeOffsets.merge(txId, offset, (current, candidate) ->
+ current.compareTo(candidate) >= 0 ? current : candidate);
+ }
+
@Override
protected boolean needFlush()
{
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java
index 8128a7e..5a7e5c8 100644
--- a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/TransactionProxy.java
@@ -26,6 +26,7 @@
import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
import io.pixelsdb.pixels.sink.util.MetricsFacade;
+import io.pixelsdb.pixels.sink.writer.retina.recovery.RecoveryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class TransactionProxy
private final MetricsFacade metricsFacade = MetricsFacade.getInstance();
private final BlockingQueue toCommitTransContextQueue;
private final String freshnessLevel;
+ private final RecoveryManager recoveryManager;
private final int BATCH_SIZE;
private final int REQUEST_BATCH_SIZE;
private final boolean REQUEST_BATCH;
@@ -72,6 +74,7 @@ private TransactionProxy()
this.transService = TransService.Instance();
this.transContextQueue = new ConcurrentLinkedDeque<>();
this.toCommitTransContextQueue = new LinkedBlockingQueue<>();
+ this.recoveryManager = RecoveryManager.getInstance();
this.batchCommitExecutor = Executors.newFixedThreadPool(
WORKER_COUNT,
r ->
@@ -168,6 +171,17 @@ public TransContext getNewTransContext(String txId)
}
}
+ public TransContext getExistingTransContext(long transId)
+ {
+ try
+ {
+ return transService.getTransContext(transId);
+ } catch (TransException e)
+ {
+ throw new RuntimeException("Failed to restore transaction context " + transId, e);
+ }
+ }
+
public void commitTransAsync(SinkContext transContext)
{
toCommitTransContextQueue.add(transContext);
@@ -176,6 +190,7 @@ public void commitTransAsync(SinkContext transContext)
public void commitTransSync(SinkContext transContext)
{
commitTrans(transContext.getPixelsTransCtx());
+ recoveryManager.markCommitted(transContext.getSourceTxId());
metricsFacade.recordTransaction();
long txEndTime = System.currentTimeMillis();
@@ -220,12 +235,14 @@ private void batchCommitWorker()
batchContexts.clear();
batchTransIds.clear();
txStartTimes.clear();
+ List sinkContexts = new ArrayList<>(BATCH_SIZE);
SinkContext firstSinkContext = toCommitTransContextQueue.take();
TransContext transContext = firstSinkContext.getPixelsTransCtx();
batchContexts.add(transContext);
batchTransIds.add(transContext.getTransId());
txStartTimes.add(firstSinkContext.getStartTime());
+ sinkContexts.add(firstSinkContext);
long startTime = System.nanoTime();
while (batchContexts.size() < BATCH_SIZE)
@@ -246,9 +263,11 @@ private void batchCommitWorker()
batchContexts.add(transContext);
batchTransIds.add(transContext.getTransId());
txStartTimes.add(ctx.getStartTime());
+ sinkContexts.add(ctx);
}
transService.commitTransBatch(batchTransIds, false);
+ sinkContexts.forEach(ctx -> recoveryManager.markCommitted(ctx.getSourceTxId()));
metricsFacade.recordTransaction(batchTransIds.size());
long txEndTime = System.currentTimeMillis();
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/CommitMarker.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/CommitMarker.java
new file mode 100644
index 0000000..d63ea65
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/CommitMarker.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public class CommitMarker implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+
+ private final String dataSourceTxId;
+ private final long pixelsTransId;
+ private final long committedAt;
+
+ public CommitMarker(String dataSourceTxId, long pixelsTransId, long committedAt)
+ {
+ this.dataSourceTxId = dataSourceTxId;
+ this.pixelsTransId = pixelsTransId;
+ this.committedAt = committedAt;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryBinding.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryBinding.java
new file mode 100644
index 0000000..6850588
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryBinding.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+@Getter
+public class RecoveryBinding implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+
+ private final String dataSourceTxId;
+ private final long pixelsTransId;
+ private final long timestamp;
+ private final long leaseStartMs;
+ private final long leasePeriodMs;
+ private final StorageSourceOffset beginOffset;
+ @Setter
+ private StorageSourceOffset lastSafeOffset;
+ @Setter
+ private RecoveryState state;
+
+ public RecoveryBinding(
+ String dataSourceTxId,
+ long pixelsTransId,
+ long timestamp,
+ long leaseStartMs,
+ long leasePeriodMs,
+ StorageSourceOffset beginOffset,
+ StorageSourceOffset lastSafeOffset,
+ RecoveryState state)
+ {
+ this.dataSourceTxId = dataSourceTxId;
+ this.pixelsTransId = pixelsTransId;
+ this.timestamp = timestamp;
+ this.leaseStartMs = leaseStartMs;
+ this.leasePeriodMs = leasePeriodMs;
+ this.beginOffset = beginOffset;
+ this.lastSafeOffset = lastSafeOffset;
+ this.state = state;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManager.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManager.java
new file mode 100644
index 0000000..ec59904
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManager.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.common.transaction.TransService;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import io.pixelsdb.pixels.sink.writer.PixelsSinkMode;
+import io.pixelsdb.pixels.sink.writer.retina.TransactionMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RecoveryManager
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RecoveryManager.class);
+ private static volatile RecoveryManager instance;
+
+ private final PixelsSinkConfig config;
+ private final boolean enabled;
+ private final RecoveryMode mode;
+ private final RecoveryStore store;
+ private final TransService transService;
+ private final Set recoveredTransactionIds = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap pendingBeginOffsets = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap pendingRowOffsets = new ConcurrentHashMap<>();
+ private volatile boolean initializedForSourceStart = false;
+ private volatile StorageSourceOffset replayStartOffset;
+
+ private RecoveryManager()
+ {
+ this.config = PixelsSinkConfigFactory.getInstance();
+ this.enabled = config.isRecoveryEnabled()
+ && "storage".equalsIgnoreCase(config.getDataSource())
+ && config.getPixelsSinkMode() == PixelsSinkMode.RETINA
+ && config.getTransactionMode() == TransactionMode.BATCH;
+ this.mode = RecoveryMode.fromValue(config.getRecoveryMode());
+ this.store = enabled ? new RocksDbRecoveryStore() : null;
+ this.transService = enabled ? TransService.Instance() : null;
+ }
+
+ public static RecoveryManager getInstance()
+ {
+ if (instance == null)
+ {
+ synchronized (RecoveryManager.class)
+ {
+ if (instance == null)
+ {
+ instance = new RecoveryManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public static void closeInstance()
+ {
+ if (instance != null)
+ {
+ try
+ {
+ instance.close();
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Failed to close recovery manager", e);
+ } finally
+ {
+ instance = null;
+ }
+ }
+ }
+
+ public boolean isEnabled()
+ {
+ return enabled;
+ }
+
+ public RecoveryMode getMode()
+ {
+ return mode;
+ }
+
+ public synchronized void initializeForSourceStart()
+ {
+ if (!enabled || initializedForSourceStart)
+ {
+ return;
+ }
+ if (mode == RecoveryMode.BOOTSTRAP)
+ {
+ if (store.hasRecoveryState())
+ {
+ if (config.isRecoveryBootstrapForceOverwrite())
+ {
+ LOGGER.warn("Recovery bootstrap force overwrite enabled; clearing previous recovery state");
+ store.reset();
+ } else
+ {
+ throw new IllegalStateException("Recovery state exists. Set sink.recovery.bootstrap.force_overwrite=true to discard it.");
+ }
+ }
+ replayStartOffset = null;
+ } else
+ {
+ if (config.isRecoveryBootstrapForceOverwrite())
+ {
+ throw new IllegalStateException("sink.recovery.bootstrap.force_overwrite can only be used in bootstrap mode");
+ }
+ List bindings = store.loadActiveBindings();
+ bindings.stream()
+ .map(RecoveryBinding::getDataSourceTxId)
+ .forEach(recoveredTransactionIds::add);
+ StorageSourceOffset earliestActiveOffset = bindings.stream()
+ .map(RecoveryBinding::getBeginOffset)
+ .min(Comparator.naturalOrder())
+ .orElse(null);
+ StorageSourceOffset checkpointOffset = loadCheckpointReplayOffset();
+ replayStartOffset = minOffset(earliestActiveOffset, checkpointOffset);
+ }
+ initializedForSourceStart = true;
+ }
+
+ public StorageSourceOffset getReplayStartOffset()
+ {
+ return replayStartOffset;
+ }
+
+ public void observeTransactionMetadata(SinkProto.TransactionMetadata transactionMetadata, StorageSourceOffset offset)
+ {
+ if (!enabled || transactionMetadata == null || offset == null)
+ {
+ return;
+ }
+ if (transactionMetadata.getStatus() == SinkProto.TransactionStatus.BEGIN)
+ {
+ pendingBeginOffsets.putIfAbsent(transactionMetadata.getId(), offset);
+ }
+ }
+
+ public void observeRowEvent(RowChangeEvent event)
+ {
+ if (!enabled || event == null || event.getSourceOffset() == null || event.getTransaction() == null)
+ {
+ return;
+ }
+ pendingRowOffsets.putIfAbsent(event.getTransaction().getId(), event.getSourceOffset());
+ }
+
+ public Optional getBinding(String dataSourceTxId)
+ {
+ if (!enabled)
+ {
+ return Optional.empty();
+ }
+ return store.getBinding(dataSourceTxId);
+ }
+
+ public RecoveryReplayRole resolveReplayRole(String dataSourceTxId)
+ {
+ if (!enabled || mode != RecoveryMode.RECOVERY || dataSourceTxId == null)
+ {
+ return RecoveryReplayRole.FORWARD_CONSUMPTION;
+ }
+ if (store.hasCommitMarker(dataSourceTxId))
+ {
+ return RecoveryReplayRole.CHECKPOINT_REPLAY;
+ }
+ if (recoveredTransactionIds.contains(dataSourceTxId))
+ {
+ return RecoveryReplayRole.ACTIVE_RECOVERY;
+ }
+ return RecoveryReplayRole.FORWARD_CONSUMPTION;
+ }
+
+ public boolean shouldSuppressCommit(String dataSourceTxId)
+ {
+ return resolveReplayRole(dataSourceTxId) == RecoveryReplayRole.CHECKPOINT_REPLAY;
+ }
+
+ @Deprecated
+ public boolean shouldSkipTransaction(String dataSourceTxId)
+ {
+ return shouldSuppressCommit(dataSourceTxId);
+ }
+
+ public boolean shouldReplayRow(RowChangeEvent event)
+ {
+ return event != null;
+ }
+
+ @Deprecated
+ public boolean shouldSkipRow(RowChangeEvent event)
+ {
+ return !shouldReplayRow(event);
+ }
+
+ public boolean shouldRewriteInsert(String dataSourceTxId)
+ {
+ return enabled
+ && mode == RecoveryMode.RECOVERY
+ && config.isRecoveryInsertAsUpdate()
+ && dataSourceTxId != null
+ && resolveReplayRole(dataSourceTxId) != RecoveryReplayRole.FORWARD_CONSUMPTION;
+ }
+
+ @Deprecated
+ public boolean shouldTransformInsert(String dataSourceTxId)
+ {
+ return shouldRewriteInsert(dataSourceTxId);
+ }
+
+ public boolean shouldTrackCommitProgress(String dataSourceTxId)
+ {
+ return !shouldSuppressCommit(dataSourceTxId);
+ }
+
+ public void recordNewBinding(String dataSourceTxId, TransContext transContext)
+ {
+ if (!enabled || transContext == null || dataSourceTxId == null || store.hasCommitMarker(dataSourceTxId) || store.getBinding(dataSourceTxId).isPresent())
+ {
+ return;
+ }
+ StorageSourceOffset beginOffset = pendingBeginOffsets.remove(dataSourceTxId);
+ if (beginOffset == null)
+ {
+ beginOffset = pendingRowOffsets.get(dataSourceTxId);
+ }
+ if (beginOffset == null)
+ {
+ LOGGER.warn("No source offset found for transaction {}, recovery metadata will not be recorded", dataSourceTxId);
+ return;
+ }
+ RecoveryBinding binding = new RecoveryBinding(
+ dataSourceTxId,
+ transContext.getTransId(),
+ transContext.getTimestamp(),
+ transContext.getLease().getStartMs(),
+ transContext.getLease().getPeriodMs(),
+ beginOffset,
+ null,
+ RecoveryState.ACTIVE
+ );
+ store.saveNewBinding(binding);
+ store.saveTimestampReplayIndex(new TimestampReplayIndexEntry(
+ transContext.getTimestamp(),
+ dataSourceTxId,
+ transContext.getTransId(),
+ beginOffset
+ ));
+ }
+
+ public void markCommitting(String dataSourceTxId)
+ {
+ if (!enabled)
+ {
+ return;
+ }
+ store.markCommitting(dataSourceTxId);
+ }
+
+ public void markCommitted(String dataSourceTxId)
+ {
+ if (!enabled)
+ {
+ return;
+ }
+ store.markCommitted(dataSourceTxId);
+ recoveredTransactionIds.remove(dataSourceTxId);
+ pendingBeginOffsets.remove(dataSourceTxId);
+ pendingRowOffsets.remove(dataSourceTxId);
+ }
+
+ public void advanceLastSafeOffset(String dataSourceTxId, StorageSourceOffset offset)
+ {
+ if (!enabled || dataSourceTxId == null || offset == null)
+ {
+ return;
+ }
+ store.advanceLastSafeOffset(dataSourceTxId, offset);
+ }
+
+ public Optional restoreTransContext(String dataSourceTxId, TransactionContextLoader loader)
+ {
+ if (!enabled)
+ {
+ return Optional.empty();
+ }
+ return store.getBinding(dataSourceTxId).map(binding ->
+ {
+ try
+ {
+ if (shouldSuppressCommit(dataSourceTxId))
+ {
+ return buildReplayOnlyTransContext(binding);
+ }
+ return loader.load(binding.getPixelsTransId());
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Failed to restore transaction context for " + dataSourceTxId, e);
+ }
+ });
+ }
+
+ private TransContext buildReplayOnlyTransContext(RecoveryBinding binding)
+ {
+ return new TransContext(
+ binding.getPixelsTransId(),
+ binding.getTimestamp(),
+ binding.getLeaseStartMs(),
+ binding.getLeasePeriodMs(),
+ false
+ );
+ }
+
+ public void close() throws IOException
+ {
+ if (store != null)
+ {
+ store.close();
+ }
+ }
+
+ private StorageSourceOffset loadCheckpointReplayOffset()
+ {
+ try
+ {
+ long safeGcTimestamp = transService.getSafeGcTimestamp();
+ return store.findReplayIndexAtOrBefore(safeGcTimestamp)
+ .map(TimestampReplayIndexEntry::getSourceOffset)
+ .orElse(null);
+ } catch (Exception e)
+ {
+ LOGGER.warn("Failed to resolve recovery checkpoint from safe gc timestamp", e);
+ return null;
+ }
+ }
+
+ private StorageSourceOffset minOffset(StorageSourceOffset left, StorageSourceOffset right)
+ {
+ if (left == null)
+ {
+ return right;
+ }
+ if (right == null)
+ {
+ return left;
+ }
+ return left.compareTo(right) <= 0 ? left : right;
+ }
+
+ @FunctionalInterface
+ public interface TransactionContextLoader
+ {
+ TransContext load(long transId) throws Exception;
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryMode.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryMode.java
new file mode 100644
index 0000000..fb671ed
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryMode.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+public enum RecoveryMode
+{
+ BOOTSTRAP,
+ RECOVERY;
+
+ public static RecoveryMode fromValue(String value)
+ {
+ for (RecoveryMode mode : values())
+ {
+ if (mode.name().equalsIgnoreCase(value))
+ {
+ return mode;
+ }
+ }
+ throw new IllegalArgumentException("Unknown recovery mode: " + value);
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryReplayRole.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryReplayRole.java
new file mode 100644
index 0000000..81dcbfd
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryReplayRole.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+public enum RecoveryReplayRole
+{
+ CHECKPOINT_REPLAY,
+ ACTIVE_RECOVERY,
+ FORWARD_CONSUMPTION
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryState.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryState.java
new file mode 100644
index 0000000..9c7b00a
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryState.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+public enum RecoveryState
+{
+ ACTIVE,
+ COMMITTING,
+ COMMITTED,
+ ABORTED
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryStore.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryStore.java
new file mode 100644
index 0000000..13b6843
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryStore.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public interface RecoveryStore extends Closeable
+{
+ boolean hasRecoveryState();
+
+ void reset();
+
+ Optional getBinding(String dataSourceTxId);
+
+ List loadActiveBindings();
+
+ boolean hasCommitMarker(String dataSourceTxId);
+
+ void saveNewBinding(RecoveryBinding binding);
+
+ void saveTimestampReplayIndex(TimestampReplayIndexEntry entry);
+
+ Optional findReplayIndexAtOrBefore(long pixelsTimestamp);
+
+ void advanceLastSafeOffset(String dataSourceTxId, StorageSourceOffset offset);
+
+ void markCommitting(String dataSourceTxId);
+
+ void markCommitted(String dataSourceTxId);
+
+ @Override
+ void close() throws IOException;
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStore.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStore.java
new file mode 100644
index 0000000..47213c9
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStore.java
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.sink.config.PixelsSinkConfig;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import org.rocksdb.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+
+public class RocksDbRecoveryStore implements RecoveryStore
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocksDbRecoveryStore.class);
+ private static final byte[] TXN_BINDING_CF = "txn_binding_cf".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] ACTIVE_TX_ORDER_CF = "active_tx_order_cf".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] COMMIT_MARKER_CF = "commit_marker_cf".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] TS_REPLAY_INDEX_CF = "ts_replay_index_cf".getBytes(StandardCharsets.UTF_8);
+
+ static
+ {
+ RocksDB.loadLibrary();
+ }
+
+ private final RocksDB db;
+ private final ColumnFamilyHandle txnBindingHandle;
+ private final ColumnFamilyHandle activeTxOrderHandle;
+ private final ColumnFamilyHandle commitMarkerHandle;
+ private final ColumnFamilyHandle tsReplayIndexHandle;
+ private final DBOptions dbOptions;
+ private final List handles;
+ private final WriteOptions writeOptions = new WriteOptions();
+
+ public RocksDbRecoveryStore()
+ {
+ PixelsSinkConfig config = PixelsSinkConfigFactory.getInstance();
+ try
+ {
+ Path dbPath = Path.of(config.getRecoveryRocksdbDir());
+ Files.createDirectories(dbPath);
+ List descriptors = List.of(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, new ColumnFamilyOptions()),
+ new ColumnFamilyDescriptor(TXN_BINDING_CF, new ColumnFamilyOptions()),
+ new ColumnFamilyDescriptor(ACTIVE_TX_ORDER_CF, new ColumnFamilyOptions()),
+ new ColumnFamilyDescriptor(COMMIT_MARKER_CF, new ColumnFamilyOptions()),
+ new ColumnFamilyDescriptor(TS_REPLAY_INDEX_CF, new ColumnFamilyOptions())
+ );
+ this.dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setCreateMissingColumnFamilies(true);
+ this.handles = new ArrayList<>();
+ this.db = RocksDB.open(dbOptions, dbPath.toString(), descriptors, handles);
+ Map handleMap = new HashMap<>();
+ for (int i = 0; i < descriptors.size(); i++)
+ {
+ handleMap.put(new String(descriptors.get(i).getName(), StandardCharsets.UTF_8), handles.get(i));
+ }
+ this.txnBindingHandle = handleMap.get(new String(TXN_BINDING_CF, StandardCharsets.UTF_8));
+ this.activeTxOrderHandle = handleMap.get(new String(ACTIVE_TX_ORDER_CF, StandardCharsets.UTF_8));
+ this.commitMarkerHandle = handleMap.get(new String(COMMIT_MARKER_CF, StandardCharsets.UTF_8));
+ this.tsReplayIndexHandle = handleMap.get(new String(TS_REPLAY_INDEX_CF, StandardCharsets.UTF_8));
+ } catch (IOException | RocksDBException e)
+ {
+ throw new RuntimeException("Failed to initialize RocksDB recovery store", e);
+ }
+ }
+
+ @Override
+ public boolean hasRecoveryState()
+ {
+ return hasAnyKey(txnBindingHandle) || hasAnyKey(commitMarkerHandle) || hasAnyKey(activeTxOrderHandle)
+ || hasAnyKey(tsReplayIndexHandle);
+ }
+
+ @Override
+ public void reset()
+ {
+ clearColumnFamily(txnBindingHandle);
+ clearColumnFamily(activeTxOrderHandle);
+ clearColumnFamily(commitMarkerHandle);
+ clearColumnFamily(tsReplayIndexHandle);
+ }
+
+ @Override
+ public Optional getBinding(String dataSourceTxId)
+ {
+ try
+ {
+ byte[] raw = db.get(txnBindingHandle, key(dataSourceTxId));
+ if (raw == null)
+ {
+ return Optional.empty();
+ }
+ return Optional.of(deserialize(raw, RecoveryBinding.class));
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to read recovery binding: " + dataSourceTxId, e);
+ }
+ }
+
+ @Override
+ public List loadActiveBindings()
+ {
+ List bindings = new ArrayList<>();
+ try (RocksIterator iterator = db.newIterator(activeTxOrderHandle))
+ {
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next())
+ {
+ String txId = new String(iterator.value(), StandardCharsets.UTF_8);
+ getBinding(txId)
+ .filter(binding -> binding.getState() != RecoveryState.COMMITTED)
+ .ifPresent(bindings::add);
+ }
+ }
+ return bindings;
+ }
+
+ @Override
+ public boolean hasCommitMarker(String dataSourceTxId)
+ {
+ try
+ {
+ return db.get(commitMarkerHandle, key(dataSourceTxId)) != null;
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to query commit marker: " + dataSourceTxId, e);
+ }
+ }
+
+ @Override
+ public void saveNewBinding(RecoveryBinding binding)
+ {
+ try (WriteBatch batch = new WriteBatch())
+ {
+ batch.put(txnBindingHandle, key(binding.getDataSourceTxId()), serialize(binding));
+ batch.put(activeTxOrderHandle, activeOrderKey(binding.getBeginOffset(), binding.getDataSourceTxId()), key(binding.getDataSourceTxId()));
+ db.write(writeOptions, batch);
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to save new recovery binding: " + binding.getDataSourceTxId(), e);
+ }
+ }
+
+ @Override
+ public void saveTimestampReplayIndex(TimestampReplayIndexEntry entry)
+ {
+ try
+ {
+ db.put(tsReplayIndexHandle, writeOptions, timestampKey(entry.getPixelsTimestamp()), serialize(entry));
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to persist timestamp replay index: " + entry.getPixelsTimestamp(), e);
+ }
+ }
+
+ @Override
+ public Optional findReplayIndexAtOrBefore(long pixelsTimestamp)
+ {
+ try (RocksIterator iterator = db.newIterator(tsReplayIndexHandle))
+ {
+ byte[] seekKey = timestampKey(pixelsTimestamp);
+ iterator.seekForPrev(seekKey);
+ if (!iterator.isValid())
+ {
+ return Optional.empty();
+ }
+ return Optional.of(deserialize(iterator.value(), TimestampReplayIndexEntry.class));
+ }
+ }
+
+ @Override
+ public void advanceLastSafeOffset(String dataSourceTxId, StorageSourceOffset offset)
+ {
+ getBinding(dataSourceTxId).ifPresent(binding ->
+ {
+ binding.setLastSafeOffset(offset);
+ persistBinding(binding);
+ });
+ }
+
+ @Override
+ public void markCommitting(String dataSourceTxId)
+ {
+ getBinding(dataSourceTxId).ifPresent(binding ->
+ {
+ if (binding.getState() == RecoveryState.COMMITTED || hasCommitMarker(dataSourceTxId))
+ {
+ return;
+ }
+ binding.setState(RecoveryState.COMMITTING);
+ persistBinding(binding);
+ });
+ }
+
+ @Override
+ public void markCommitted(String dataSourceTxId)
+ {
+ getBinding(dataSourceTxId).ifPresent(binding ->
+ {
+ binding.setState(RecoveryState.COMMITTED);
+ CommitMarker marker = new CommitMarker(dataSourceTxId, binding.getPixelsTransId(), System.currentTimeMillis());
+ try (WriteBatch batch = new WriteBatch())
+ {
+ batch.put(txnBindingHandle, key(dataSourceTxId), serialize(binding));
+ batch.put(commitMarkerHandle, key(dataSourceTxId), serialize(marker));
+ batch.delete(activeTxOrderHandle, activeOrderKey(binding.getBeginOffset(), dataSourceTxId));
+ db.write(writeOptions, batch);
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to mark committed: " + dataSourceTxId, e);
+ }
+ });
+ }
+
+ @Override
+ public void close()
+ {
+ writeOptions.close();
+ handles.forEach(ColumnFamilyHandle::close);
+ db.close();
+ dbOptions.close();
+ }
+
+ private void persistBinding(RecoveryBinding binding)
+ {
+ try
+ {
+ db.put(txnBindingHandle, writeOptions, key(binding.getDataSourceTxId()), serialize(binding));
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to persist recovery binding: " + binding.getDataSourceTxId(), e);
+ }
+ }
+
+ private boolean hasAnyKey(ColumnFamilyHandle handle)
+ {
+ try (RocksIterator iterator = db.newIterator(handle))
+ {
+ iterator.seekToFirst();
+ return iterator.isValid();
+ }
+ }
+
+ private void clearColumnFamily(ColumnFamilyHandle handle)
+ {
+ List keys = new ArrayList<>();
+ try (RocksIterator iterator = db.newIterator(handle))
+ {
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next())
+ {
+ keys.add(Arrays.copyOf(iterator.key(), iterator.key().length));
+ }
+ }
+ try (WriteBatch batch = new WriteBatch())
+ {
+ for (byte[] key : keys)
+ {
+ batch.delete(handle, key);
+ }
+ db.write(writeOptions, batch);
+ } catch (RocksDBException e)
+ {
+ throw new RuntimeException("Failed to clear recovery column family", e);
+ }
+ }
+
+ private static byte[] key(String value)
+ {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static byte[] timestampKey(long pixelsTimestamp)
+ {
+ return ByteBuffer.allocate(Long.BYTES).putLong(pixelsTimestamp).array();
+ }
+
+ private static byte[] activeOrderKey(StorageSourceOffset offset, String dataSourceTxId)
+ {
+ byte[] txIdBytes = dataSourceTxId.getBytes(StandardCharsets.UTF_8);
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Long.BYTES + txIdBytes.length);
+ buffer.putInt(offset.getEpoch());
+ buffer.putInt(offset.getFileId());
+ buffer.putLong(offset.getByteOffset());
+ buffer.put(txIdBytes);
+ return buffer.array();
+ }
+
+ private static byte[] serialize(Serializable value)
+ {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream))
+ {
+ objectOutputStream.writeObject(value);
+ objectOutputStream.flush();
+ return byteArrayOutputStream.toByteArray();
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Failed to serialize recovery metadata", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static T deserialize(byte[] value, Class type)
+ {
+ try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(value);
+ ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream))
+ {
+ Object object = objectInputStream.readObject();
+ return (T) type.cast(object);
+ } catch (IOException | ClassNotFoundException e)
+ {
+ throw new RuntimeException("Failed to deserialize recovery metadata", e);
+ }
+ }
+}
diff --git a/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/TimestampReplayIndexEntry.java b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/TimestampReplayIndexEntry.java
new file mode 100644
index 0000000..3fff505
--- /dev/null
+++ b/src/main/java/io/pixelsdb/pixels/sink/writer/retina/recovery/TimestampReplayIndexEntry.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TimestampReplayIndexEntry implements Serializable
+{
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private long pixelsTimestamp;
+ private String dataSourceTxId;
+ private long pixelsTransId;
+ private StorageSourceOffset sourceOffset;
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/integration/ProtoCdcLogIntegrationTest.java b/src/test/java/io/pixelsdb/pixels/sink/integration/ProtoCdcLogIntegrationTest.java
new file mode 100644
index 0000000..8fd04e2
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/integration/ProtoCdcLogIntegrationTest.java
@@ -0,0 +1,465 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.integration;
+
+import com.google.protobuf.ByteString;
+import io.pixelsdb.pixels.common.exception.MetadataException;
+import io.pixelsdb.pixels.common.index.SinglePointIndex;
+import io.pixelsdb.pixels.common.metadata.MetadataService;
+import io.pixelsdb.pixels.common.metadata.domain.Column;
+import io.pixelsdb.pixels.common.metadata.domain.Layout;
+import io.pixelsdb.pixels.common.metadata.domain.Table;
+import io.pixelsdb.pixels.common.physical.Storage;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.exception.SinkException;
+import io.pixelsdb.pixels.sink.metadata.TableMetadataRegistry;
+import io.pixelsdb.pixels.sink.writer.proto.ProtoWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.TestAbortedException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ProtoCdcLogIntegrationTest
+{
+ private static final String TEST_SCHEMA = "pixels_sink_it";
+ private static final String TEST_TABLE = "recovery_account";
+ private static final String DATASET_NAME = "recovery-cdc-it";
+ private static final String TX_ID = "it-tx-1";
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ @AfterEach
+ void tearDown()
+ {
+ PixelsSinkConfigFactory.reset();
+ }
+
+ @Test
+ void shouldCreateSchemaAndGenerateLocalProtoCdcLog() throws Exception
+ {
+ Path outputRoot = Paths.get("target", "integration-tests", "proto-cdc");
+ recreateDirectory(outputRoot);
+ initializeProtoWriterConfig(outputRoot);
+
+ TableSpec tableSpec = resolveTableSpec();
+
+ try (ProtoWriter writer = new ProtoWriter())
+ {
+ writer.writeTrans(beginTransaction());
+ writer.writeRow(insertRow(tableSpec));
+ writer.writeRow(updateRow(tableSpec));
+ writer.writeRow(deleteRow(tableSpec));
+ writer.writeTrans(endTransaction(tableSpec, 3));
+ }
+
+ Path datasetDir = outputRoot.resolve(DATASET_NAME);
+ assertTrue(Files.isDirectory(datasetDir), "Proto dataset directory was not created");
+
+ List protoFiles;
+ try (var pathStream = Files.list(datasetDir))
+ {
+ protoFiles = pathStream
+ .filter(path -> path.getFileName().toString().endsWith(".proto"))
+ .sorted()
+ .toList();
+ }
+
+ assertFalse(protoFiles.isEmpty(), "No proto CDC files were generated");
+
+ int framedRecordCount = countFramedRecords(protoFiles.get(0));
+ assertEquals(5, framedRecordCount, "Expected BEGIN + 3 row events + END in the proto CDC file");
+ }
+
+ private void initializeProtoWriterConfig(Path outputRoot)
+ {
+ ConfigFactory configFactory = ConfigFactory.Instance();
+ configFactory.addProperty("sink.proto.dir", outputRoot.toUri().toString());
+ configFactory.addProperty("sink.proto.data", DATASET_NAME);
+ configFactory.addProperty("sink.proto.maxRecords", "100");
+ PixelsSinkConfigFactory.initialize(configFactory);
+ }
+
+ private TableSpec resolveTableSpec() throws MetadataException
+ {
+ MetadataService metadataService = MetadataService.Instance();
+ String configuredSchema = System.getProperty("pixels.it.schema");
+ String configuredTable = System.getProperty("pixels.it.table");
+ if (configuredSchema != null && configuredTable != null &&
+ metadataService.existTable(configuredSchema, configuredTable))
+ {
+ return buildTableSpecFromMetadata(configuredSchema, configuredTable, metadataService);
+ }
+
+ try
+ {
+ ensureSchemaAndTable(metadataService);
+ return TableSpec.minimalTestTable();
+ } catch (MetadataException createFailure)
+ {
+ if (metadataService.existTable("pixels_bench_sf1x", "savingaccount"))
+ {
+ return buildTableSpecFromMetadata("pixels_bench_sf1x", "savingaccount", metadataService);
+ }
+ if (metadataService.existTable("pixels_bench_sf10x", "savingaccount"))
+ {
+ return buildTableSpecFromMetadata("pixels_bench_sf10x", "savingaccount", metadataService);
+ }
+ throw new TestAbortedException(
+ "No reusable integration-test table found. Start with an existing table via "
+ + "-Dpixels.it.schema= -Dpixels.it.table=, "
+ + "or provide a metadata environment that allows createTable().",
+ createFailure
+ );
+ }
+ }
+
+ private void ensureSchemaAndTable(MetadataService metadataService) throws MetadataException
+ {
+ if (!metadataService.existSchema(TEST_SCHEMA))
+ {
+ metadataService.createSchema(TEST_SCHEMA);
+ }
+ if (!metadataService.existTable(TEST_SCHEMA, TEST_TABLE))
+ {
+ metadataService.createTable(
+ TEST_SCHEMA,
+ TEST_TABLE,
+ Storage.Scheme.file,
+ List.of("account_id"),
+ List.of(
+ column("account_id", "bigint"),
+ column("balance", "bigint"),
+ column("owner", "varchar(32)")
+ )
+ );
+ }
+
+ ensurePrimaryIndex(metadataService);
+ }
+
+ private void ensurePrimaryIndex(MetadataService metadataService) throws MetadataException
+ {
+ Table table = metadataService.getTable(TEST_SCHEMA, TEST_TABLE);
+ try
+ {
+ metadataService.getPrimaryIndex(table.getId());
+ return;
+ } catch (MetadataException ignored)
+ {
+ // Fall through and create the primary index.
+ }
+
+ List columns = metadataService.getColumns(TEST_SCHEMA, TEST_TABLE, false);
+ long accountIdColumnId = columns.stream()
+ .filter(column -> "account_id".equals(column.getName()))
+ .findFirst()
+ .orElseThrow(() -> new MetadataException("Missing account_id column in metadata"))
+ .getId();
+ Layout layout = metadataService.getLatestLayout(TEST_SCHEMA, TEST_TABLE);
+
+ io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex index =
+ new io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex();
+ index.setPrimary(true);
+ index.setUnique(true);
+ index.setIndexScheme(SinglePointIndex.Scheme.rocksdb);
+ index.setTableId(table.getId());
+ index.setSchemaVersionId(layout.getSchemaVersionId());
+ index.setKeyColumnsJson("{\"keyColumnIds\":[" + accountIdColumnId + "]}");
+ metadataService.createSinglePointIndex(index);
+ }
+
+ private Column column(String name, String type)
+ {
+ Column column = new Column();
+ column.setName(name);
+ column.setType(type);
+ return column;
+ }
+
+ private TableSpec buildTableSpecFromMetadata(String schemaName, String tableName, MetadataService metadataService)
+ throws MetadataException
+ {
+ List columns = metadataService.getColumns(schemaName, tableName, false);
+ List keyColumnNames;
+ try
+ {
+ keyColumnNames = TableMetadataRegistry.Instance().getKeyColumnsName(schemaName, tableName);
+ } catch (SinkException e)
+ {
+ throw new MetadataException("Failed to load primary key metadata for " + schemaName + "." + tableName, e);
+ }
+ List insertValues = new ArrayList<>(columns.size());
+ List updateAfterValues = new ArrayList<>(columns.size());
+ int updateColumnIndex = -1;
+
+ for (int i = 0; i < columns.size(); i++)
+ {
+ Column column = columns.get(i);
+ String value = sampleValue(column.getType(), i, false);
+ insertValues.add(value);
+ updateAfterValues.add(value);
+ if (updateColumnIndex < 0 && !keyColumnNames.contains(column.getName()) && isUpdatableType(column.getType()))
+ {
+ updateColumnIndex = i;
+ }
+ }
+
+ if (updateColumnIndex >= 0)
+ {
+ updateAfterValues.set(
+ updateColumnIndex,
+ sampleValue(columns.get(updateColumnIndex).getType(), updateColumnIndex, true)
+ );
+ }
+
+ return new TableSpec(
+ schemaName,
+ tableName,
+ insertValues,
+ new ArrayList<>(insertValues),
+ updateAfterValues,
+ new ArrayList<>(updateAfterValues)
+ );
+ }
+
+ private SinkProto.TransactionMetadata beginTransaction()
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(TX_ID)
+ .setStatus(SinkProto.TransactionStatus.BEGIN)
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+ }
+
+ private SinkProto.TransactionMetadata endTransaction(TableSpec tableSpec, int rowCount)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(TX_ID)
+ .setStatus(SinkProto.TransactionStatus.END)
+ .setTimestamp(System.currentTimeMillis())
+ .addDataCollections(
+ SinkProto.DataCollection.newBuilder()
+ .setDataCollection(tableSpec.schemaName + "." + tableSpec.tableName)
+ .setEventCount(rowCount)
+ .build()
+ )
+ .build();
+ }
+
+ private RowChangeEvent insertRow(TableSpec tableSpec) throws SinkException
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(sourceInfo(tableSpec))
+ .setTransaction(transactionInfo())
+ .setOp(SinkProto.OperationType.INSERT)
+ .setAfter(buildRowValue(tableSpec.insertValues))
+ .build());
+ }
+
+ private RowChangeEvent updateRow(TableSpec tableSpec) throws SinkException
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(sourceInfo(tableSpec))
+ .setTransaction(transactionInfo())
+ .setOp(SinkProto.OperationType.UPDATE)
+ .setBefore(buildRowValue(tableSpec.updateBeforeValues))
+ .setAfter(buildRowValue(tableSpec.updateAfterValues))
+ .build());
+ }
+
+ private RowChangeEvent deleteRow(TableSpec tableSpec) throws SinkException
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(sourceInfo(tableSpec))
+ .setTransaction(transactionInfo())
+ .setOp(SinkProto.OperationType.DELETE)
+ .setBefore(buildRowValue(tableSpec.deleteBeforeValues))
+ .build());
+ }
+
+ private SinkProto.SourceInfo sourceInfo(TableSpec tableSpec)
+ {
+ return SinkProto.SourceInfo.newBuilder()
+ .setDb(tableSpec.schemaName)
+ .setTable(tableSpec.tableName)
+ .build();
+ }
+
+ private SinkProto.TransactionInfo transactionInfo()
+ {
+ return SinkProto.TransactionInfo.newBuilder()
+ .setId(TX_ID)
+ .build();
+ }
+
+ private SinkProto.ColumnValue columnValue(String value)
+ {
+ return SinkProto.ColumnValue.newBuilder()
+ .setValue(ByteString.copyFrom(value.getBytes(StandardCharsets.UTF_8)))
+ .build();
+ }
+
+ private SinkProto.RowValue buildRowValue(List values)
+ {
+ SinkProto.RowValue.Builder builder = SinkProto.RowValue.newBuilder();
+ for (String value : values)
+ {
+ builder.addValues(columnValue(value));
+ }
+ return builder.build();
+ }
+
+ private String sampleValue(String rawType, int columnIndex, boolean updated)
+ {
+ String type = rawType.toLowerCase();
+ if (type.contains("bigint") || type.contains("long"))
+ {
+ return updated ? Long.toString(2000L + columnIndex) : Long.toString(1000L + columnIndex);
+ }
+ if (type.contains("int") || type.contains("smallint") || type.contains("tinyint"))
+ {
+ return updated ? Integer.toString(200 + columnIndex) : Integer.toString(100 + columnIndex);
+ }
+ if (type.contains("float") || type.contains("double") || type.contains("decimal"))
+ {
+ return updated ? (2000 + columnIndex) + ".5" : (1000 + columnIndex) + ".0";
+ }
+ if (type.contains("bool"))
+ {
+ return updated ? "false" : "true";
+ }
+ if (type.contains("date") && !type.contains("timestamp"))
+ {
+ return LocalDate.of(2026, 4, updated ? 28 : 27).toString();
+ }
+ if (type.contains("timestamp") || type.contains("datetime"))
+ {
+ return LocalDateTime.of(2026, 4, updated ? 28 : 27, 20, 0, 0).format(TIMESTAMP_FORMATTER);
+ }
+ if (type.contains("time"))
+ {
+ return updated ? "20:05:00" : "20:00:00";
+ }
+ return updated ? "updated_" + columnIndex : "value_" + columnIndex;
+ }
+
+ private boolean isUpdatableType(String rawType)
+ {
+ String type = rawType.toLowerCase();
+ return !type.contains("binary") && !type.contains("blob") && !type.contains("vector");
+ }
+
+ private int countFramedRecords(Path protoFile) throws IOException
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(protoFile)).order(ByteOrder.BIG_ENDIAN);
+ int count = 0;
+ while (buffer.remaining() > 0)
+ {
+ int key = buffer.getInt();
+ int length = buffer.getInt();
+ assertTrue(length >= 0, "Invalid framed record length for key " + key);
+ assertTrue(buffer.remaining() >= length, "Truncated framed record for key " + key);
+ buffer.position(buffer.position() + length);
+ count++;
+ }
+ return count;
+ }
+
+ private void recreateDirectory(Path path) throws IOException
+ {
+ if (Files.exists(path))
+ {
+ try (var stream = Files.walk(path))
+ {
+ stream.sorted(Comparator.reverseOrder())
+ .forEach(current ->
+ {
+ try
+ {
+ Files.delete(current);
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ Files.createDirectories(path);
+ }
+
+ private static final class TableSpec
+ {
+ private final String schemaName;
+ private final String tableName;
+ private final List insertValues;
+ private final List updateBeforeValues;
+ private final List updateAfterValues;
+ private final List deleteBeforeValues;
+
+ private TableSpec(
+ String schemaName,
+ String tableName,
+ List insertValues,
+ List updateBeforeValues,
+ List updateAfterValues,
+ List deleteBeforeValues)
+ {
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.insertValues = insertValues;
+ this.updateBeforeValues = updateBeforeValues;
+ this.updateAfterValues = updateAfterValues;
+ this.deleteBeforeValues = deleteBeforeValues;
+ }
+
+ private static TableSpec minimalTestTable()
+ {
+ return new TableSpec(
+ TEST_SCHEMA,
+ TEST_TABLE,
+ List.of("1001", "10000", "alice"),
+ List.of("1001", "10000", "alice"),
+ List.of("1001", "12000", "alice"),
+ List.of("1001", "12000", "alice")
+ );
+ }
+
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/integration/RayProtoCdcLogGeneratorTest.java b/src/test/java/io/pixelsdb/pixels/sink/integration/RayProtoCdcLogGeneratorTest.java
new file mode 100644
index 0000000..680fecd
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/integration/RayProtoCdcLogGeneratorTest.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.integration;
+
+import com.google.protobuf.ByteString;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.writer.proto.ProtoWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+class RayProtoCdcLogGeneratorTest
+{
+ private static final String SCHEMA = "pixels_test";
+ private static final String TABLE = "ray";
+
+ @AfterEach
+ void tearDown()
+ {
+ PixelsSinkConfigFactory.reset();
+ }
+
+ @Test
+ void shouldGenerateBaselineRayCdcLog() throws Exception
+ {
+ ConfigFactory configFactory = ConfigFactory.Instance();
+ configFactory.addProperty("sink.proto.dir", "file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc");
+ configFactory.addProperty("sink.proto.data", "ray-recovery-it");
+ configFactory.addProperty("sink.proto.maxRecords", "1000");
+ PixelsSinkConfigFactory.initialize(configFactory);
+
+ try (ProtoWriter writer = new ProtoWriter())
+ {
+ writer.writeTrans(txBegin("ray-tx-1"));
+ writer.writeRow(insert("ray-tx-1", 900001, 18, "2026-04-29 02:55:00"));
+ writer.writeTrans(txEnd("ray-tx-1", 1));
+
+ writer.writeTrans(txBegin("ray-tx-2"));
+ writer.writeRow(update("ray-tx-2", 900001, 18, 19, "2026-04-29 02:56:00"));
+ writer.writeTrans(txEnd("ray-tx-2", 1));
+ }
+ }
+
+ private SinkProto.TransactionMetadata txBegin(String txId)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(txId)
+ .setStatus(SinkProto.TransactionStatus.BEGIN)
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+ }
+
+ private SinkProto.TransactionMetadata txEnd(String txId, int rowCount)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(txId)
+ .setStatus(SinkProto.TransactionStatus.END)
+ .setTimestamp(System.currentTimeMillis())
+ .addDataCollections(SinkProto.DataCollection.newBuilder()
+ .setDataCollection(SCHEMA + "." + TABLE)
+ .setEventCount(rowCount)
+ .build())
+ .build();
+ }
+
+ private RowChangeEvent insert(String txId, int id, int age, String freshnessTs) throws Exception
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(source())
+ .setTransaction(transaction(txId))
+ .setOp(SinkProto.OperationType.INSERT)
+ .setAfter(rowValue(id, age, freshnessTs))
+ .build());
+ }
+
+ private RowChangeEvent update(String txId, int id, int beforeAge, int afterAge, String afterTs) throws Exception
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(source())
+ .setTransaction(transaction(txId))
+ .setOp(SinkProto.OperationType.UPDATE)
+ .setBefore(rowValue(id, beforeAge, "2026-04-29 02:55:00"))
+ .setAfter(rowValue(id, afterAge, afterTs))
+ .build());
+ }
+
+ private SinkProto.SourceInfo source()
+ {
+ return SinkProto.SourceInfo.newBuilder()
+ .setDb(SCHEMA)
+ .setSchema(SCHEMA)
+ .setTable(TABLE)
+ .build();
+ }
+
+ private SinkProto.TransactionInfo transaction(String txId)
+ {
+ return SinkProto.TransactionInfo.newBuilder()
+ .setId(txId)
+ .build();
+ }
+
+ private SinkProto.RowValue rowValue(int id, int age, String freshnessTs)
+ {
+ return SinkProto.RowValue.newBuilder()
+ .addValues(intColumnValue(id))
+ .addValues(intColumnValue(age))
+ .addValues(timestampColumnValue(freshnessTs))
+ .build();
+ }
+
+ private SinkProto.ColumnValue intColumnValue(int value)
+ {
+ return SinkProto.ColumnValue.newBuilder()
+ .setValue(ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES).putInt(value).array()))
+ .build();
+ }
+
+ private SinkProto.ColumnValue timestampColumnValue(String value)
+ {
+ long epochMillis = LocalDateTime.parse(value.replace(" ", "T"))
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ return SinkProto.ColumnValue.newBuilder()
+ .setValue(ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(epochMillis).array()))
+ .build();
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorMain.java b/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorMain.java
new file mode 100644
index 0000000..6ed2397
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorMain.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.integration;
+
+public final class RayRecoveryWindowCdcLogGeneratorMain
+{
+ private RayRecoveryWindowCdcLogGeneratorMain()
+ {
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ new RayRecoveryWindowCdcLogGeneratorTest().shouldGenerateLongRunningRecoveryWindowLog();
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorTest.java b/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorTest.java
new file mode 100644
index 0000000..f96c2a7
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/integration/RayRecoveryWindowCdcLogGeneratorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.integration;
+
+import com.google.protobuf.ByteString;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.event.RowChangeEvent;
+import io.pixelsdb.pixels.sink.writer.proto.ProtoWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+class RayRecoveryWindowCdcLogGeneratorTest
+{
+ private static final String TOPIC = "ray-recovery-window-it-v2";
+ private static final String SCHEMA = "pixels_test";
+ private static final String TABLE = "ray";
+ private static final String TX_ID = "ray-window-tx-1";
+ private static final int ROW_COUNT = 50;
+ private static final int START_ID = 910000;
+
+ @AfterEach
+ void tearDown()
+ {
+ PixelsSinkConfigFactory.reset();
+ }
+
+ @Test
+ void shouldGenerateLongRunningRecoveryWindowLog() throws Exception
+ {
+ ConfigFactory configFactory = ConfigFactory.Instance();
+ configFactory.addProperty("sink.proto.dir", "file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc");
+ configFactory.addProperty("sink.proto.data", TOPIC);
+ configFactory.addProperty("sink.proto.maxRecords", "1000");
+ PixelsSinkConfigFactory.initialize(configFactory);
+
+ try (ProtoWriter writer = new ProtoWriter())
+ {
+ writer.writeTrans(txBegin(TX_ID));
+ for (int i = 0; i < ROW_COUNT; i++)
+ {
+ int id = START_ID + i;
+ int age = 20 + (i % 10);
+ String ts = String.format("2026-04-29 03:%02d:%02d", i / 60, i % 60);
+ writer.writeRow(insert(TX_ID, id, age, ts));
+ }
+ writer.writeTrans(txEnd(TX_ID, ROW_COUNT));
+ }
+ }
+
+ private SinkProto.TransactionMetadata txBegin(String txId)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(txId)
+ .setStatus(SinkProto.TransactionStatus.BEGIN)
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+ }
+
+ private SinkProto.TransactionMetadata txEnd(String txId, int rowCount)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(txId)
+ .setStatus(SinkProto.TransactionStatus.END)
+ .setTimestamp(System.currentTimeMillis())
+ .addDataCollections(SinkProto.DataCollection.newBuilder()
+ .setDataCollection(SCHEMA + "." + TABLE)
+ .setEventCount(rowCount)
+ .build())
+ .build();
+ }
+
+ private RowChangeEvent insert(String txId, int id, int age, String freshnessTs) throws Exception
+ {
+ return new RowChangeEvent(SinkProto.RowRecord.newBuilder()
+ .setSource(source())
+ .setTransaction(transaction(txId))
+ .setOp(SinkProto.OperationType.INSERT)
+ .setAfter(rowValue(id, age, freshnessTs))
+ .build());
+ }
+
+ private SinkProto.SourceInfo source()
+ {
+ return SinkProto.SourceInfo.newBuilder()
+ .setDb(SCHEMA)
+ .setSchema(SCHEMA)
+ .setTable(TABLE)
+ .build();
+ }
+
+ private SinkProto.TransactionInfo transaction(String txId)
+ {
+ return SinkProto.TransactionInfo.newBuilder()
+ .setId(txId)
+ .build();
+ }
+
+ private SinkProto.RowValue rowValue(int id, int age, String freshnessTs)
+ {
+ return SinkProto.RowValue.newBuilder()
+ .addValues(intColumnValue(id))
+ .addValues(intColumnValue(age))
+ .addValues(timestampColumnValue(freshnessTs))
+ .build();
+ }
+
+ private SinkProto.ColumnValue intColumnValue(int value)
+ {
+ return SinkProto.ColumnValue.newBuilder()
+ .setValue(ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES).putInt(value).array()))
+ .build();
+ }
+
+ private SinkProto.ColumnValue timestampColumnValue(String value)
+ {
+ long epochMillis = LocalDateTime.parse(value.replace(" ", "T"))
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ return SinkProto.ColumnValue.newBuilder()
+ .setValue(ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(epochMillis).array()))
+ .build();
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffsetTest.java b/src/test/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffsetTest.java
new file mode 100644
index 0000000..31d1747
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/source/storage/StorageSourceOffsetTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.source.storage;
+
+import io.pixelsdb.pixels.sink.provider.ProtoType;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class StorageSourceOffsetTest
+{
+ @Test
+ void shouldOrderOffsetsByEpochThenFileThenByteOffset()
+ {
+ StorageSourceOffset epoch1 = new StorageSourceOffset(0, 10L, 1, ProtoType.ROW);
+ StorageSourceOffset file0 = new StorageSourceOffset(0, 10L, 0, ProtoType.ROW);
+ StorageSourceOffset file1 = new StorageSourceOffset(1, 5L, 0, ProtoType.TRANS);
+ StorageSourceOffset laterByteOffset = new StorageSourceOffset(1, 20L, 0, ProtoType.ROW);
+
+ List offsets = new ArrayList<>(List.of(epoch1, laterByteOffset, file1, file0));
+ offsets.sort(StorageSourceOffset::compareTo);
+
+ assertEquals(file0, offsets.get(0));
+ assertEquals(file1, offsets.get(1));
+ assertEquals(laterByteOffset, offsets.get(2));
+ assertEquals(epoch1, offsets.get(3));
+ }
+
+ @Test
+ void shouldIgnoreRecordTypeWhenComparingSamePhysicalPosition()
+ {
+ StorageSourceOffset rowOffset = new StorageSourceOffset(2, 30L, 4, ProtoType.ROW);
+ StorageSourceOffset transOffset = new StorageSourceOffset(2, 30L, 4, ProtoType.TRANS);
+
+ assertEquals(0, rowOffset.compareTo(transOffset));
+ assertEquals(0, transOffset.compareTo(rowOffset));
+ assertTrue(rowOffset.equals(new StorageSourceOffset(2, 30L, 4, ProtoType.ROW)));
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManagerTest.java b/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManagerTest.java
new file mode 100644
index 0000000..5e55c5b
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RecoveryManagerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.common.transaction.TransContext;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.sink.SinkProto;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.provider.ProtoType;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class RecoveryManagerTest
+{
+ @TempDir
+ Path tempDir;
+
+ @AfterEach
+ void tearDown()
+ {
+ RecoveryManager.closeInstance();
+ PixelsSinkConfigFactory.reset();
+ }
+
+ @Test
+ void shouldRecordBindingAndRestoreExistingContext()
+ {
+ Path rocksdbDir = tempDir.resolve("restore-existing");
+ initializeRecoveryConfig(rocksdbDir, "recovery", false);
+
+ RecoveryManager manager = RecoveryManager.getInstance();
+ String txId = "tx-restore";
+ StorageSourceOffset beginOffset = new StorageSourceOffset(0, 10L, 0, ProtoType.TRANS);
+ manager.observeTransactionMetadata(beginTx(txId), beginOffset);
+
+ TransContext original = new TransContext(501L, 601L, 701L, 801L, false);
+ manager.recordNewBinding(txId, original);
+
+ RecoveryBinding binding = manager.getBinding(txId).orElseThrow();
+ assertEquals(original.getTransId(), binding.getPixelsTransId());
+ assertEquals(original.getTimestamp(), binding.getTimestamp());
+ assertEquals(beginOffset, binding.getBeginOffset());
+
+ TransContext restored = manager.restoreTransContext(txId, transId ->
+ {
+ assertEquals(original.getTransId(), transId);
+ return original;
+ }).orElseThrow();
+ assertSame(original, restored);
+ }
+
+ @Test
+ void shouldPickEarliestReplayStartOffsetInRecoveryMode()
+ {
+ Path rocksdbDir = tempDir.resolve("replay-start");
+ initializeRecoveryConfig(rocksdbDir, "recovery", false);
+
+ try (RocksDbRecoveryStore store = new RocksDbRecoveryStore())
+ {
+ store.saveNewBinding(new RecoveryBinding(
+ "tx-late",
+ 1001L,
+ 1101L,
+ 1201L,
+ 1301L,
+ new StorageSourceOffset(1, 200L, 1, ProtoType.TRANS),
+ null,
+ RecoveryState.ACTIVE
+ ));
+ store.saveNewBinding(new RecoveryBinding(
+ "tx-early",
+ 1002L,
+ 1102L,
+ 1202L,
+ 1302L,
+ new StorageSourceOffset(0, 100L, 0, ProtoType.TRANS),
+ null,
+ RecoveryState.ACTIVE
+ ));
+ }
+
+ RecoveryManager.closeInstance();
+ PixelsSinkConfigFactory.reset();
+ initializeRecoveryConfig(rocksdbDir, "recovery", false);
+
+ RecoveryManager manager = RecoveryManager.getInstance();
+ manager.initializeForSourceStart();
+
+ StorageSourceOffset replayStart = manager.getReplayStartOffset();
+ assertNotNull(replayStart);
+ assertEquals(new StorageSourceOffset(0, 100L, 0, ProtoType.TRANS), replayStart);
+ assertEquals(RecoveryReplayRole.ACTIVE_RECOVERY, manager.resolveReplayRole("tx-early"));
+ assertEquals(RecoveryReplayRole.ACTIVE_RECOVERY, manager.resolveReplayRole("tx-late"));
+ assertTrue(manager.shouldRewriteInsert("tx-early"));
+ assertTrue(manager.shouldRewriteInsert("tx-late"));
+ }
+
+ @Test
+ void shouldSkipDuplicateCommitButStillTransformCommittedReplay()
+ {
+ Path rocksdbDir = tempDir.resolve("commit-marker");
+ initializeRecoveryConfig(rocksdbDir, "recovery", false);
+
+ RecoveryManager manager = RecoveryManager.getInstance();
+ String txId = "tx-commit";
+ manager.observeTransactionMetadata(beginTx(txId), new StorageSourceOffset(0, 15L, 0, ProtoType.TRANS));
+ manager.recordNewBinding(txId, new TransContext(9001L, 9002L, 9003L, 9004L, false));
+
+ assertEquals(RecoveryReplayRole.FORWARD_CONSUMPTION, manager.resolveReplayRole(txId));
+ assertFalse(manager.shouldSuppressCommit(txId));
+ assertFalse(manager.shouldRewriteInsert(txId));
+
+ manager.markCommitted(txId);
+
+ assertEquals(RecoveryReplayRole.CHECKPOINT_REPLAY, manager.resolveReplayRole(txId));
+ assertTrue(manager.shouldSuppressCommit(txId));
+ assertTrue(manager.shouldRewriteInsert(txId));
+ assertTrue(manager.shouldTrackCommitProgress("fresh-tx"));
+ }
+
+ @Test
+ void shouldClearExistingStateInBootstrapForceOverwriteMode()
+ {
+ Path rocksdbDir = tempDir.resolve("bootstrap-overwrite");
+ initializeRecoveryConfig(rocksdbDir, "recovery", false);
+ try (RocksDbRecoveryStore store = new RocksDbRecoveryStore())
+ {
+ store.saveNewBinding(new RecoveryBinding(
+ "tx-bootstrap",
+ 2001L,
+ 2101L,
+ 2201L,
+ 2301L,
+ new StorageSourceOffset(0, 1L, 0, ProtoType.TRANS),
+ null,
+ RecoveryState.ACTIVE
+ ));
+ }
+
+ RecoveryManager.closeInstance();
+ PixelsSinkConfigFactory.reset();
+ initializeRecoveryConfig(rocksdbDir, "bootstrap", true);
+
+ RecoveryManager manager = RecoveryManager.getInstance();
+ manager.initializeForSourceStart();
+ assertNull(manager.getReplayStartOffset());
+ assertTrue(manager.getBinding("tx-bootstrap").isEmpty());
+
+ RecoveryManager.closeInstance();
+ try (RocksDbRecoveryStore store = new RocksDbRecoveryStore())
+ {
+ assertFalse(store.hasRecoveryState());
+ }
+ }
+
+ @Test
+ void shouldRejectForceOverwriteInRecoveryMode()
+ {
+ initializeRecoveryConfig(tempDir.resolve("invalid-config"), "recovery", true);
+
+ RecoveryManager manager = RecoveryManager.getInstance();
+ IllegalStateException exception = assertThrows(IllegalStateException.class, manager::initializeForSourceStart);
+ assertTrue(exception.getMessage().contains("force_overwrite"));
+ }
+
+ private void initializeRecoveryConfig(Path rocksdbDir, String mode, boolean forceOverwrite)
+ {
+ ConfigFactory configFactory = ConfigFactory.Instance();
+ configFactory.addProperty("sink.datasource", "storage");
+ configFactory.addProperty("sink.mode", "retina");
+ configFactory.addProperty("sink.trans.mode", "batch");
+ configFactory.addProperty("sink.recovery.enable", "true");
+ configFactory.addProperty("sink.recovery.mode", mode);
+ configFactory.addProperty("sink.recovery.bootstrap.force_overwrite", Boolean.toString(forceOverwrite));
+ configFactory.addProperty("sink.recovery.rocksdb.dir", rocksdbDir.toString());
+ configFactory.addProperty("sink.recovery.insert_as_update", "true");
+ PixelsSinkConfigFactory.initialize(configFactory);
+ }
+
+ private SinkProto.TransactionMetadata beginTx(String txId)
+ {
+ return SinkProto.TransactionMetadata.newBuilder()
+ .setId(txId)
+ .setStatus(SinkProto.TransactionStatus.BEGIN)
+ .build();
+ }
+}
diff --git a/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStoreTest.java b/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStoreTest.java
new file mode 100644
index 0000000..4e93073
--- /dev/null
+++ b/src/test/java/io/pixelsdb/pixels/sink/writer/retina/recovery/RocksDbRecoveryStoreTest.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.sink.writer.retina.recovery;
+
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.sink.config.factory.PixelsSinkConfigFactory;
+import io.pixelsdb.pixels.sink.provider.ProtoType;
+import io.pixelsdb.pixels.sink.source.storage.StorageSourceOffset;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class RocksDbRecoveryStoreTest
+{
+ @TempDir
+ Path tempDir;
+
+ @AfterEach
+ void tearDown()
+ {
+ RecoveryManager.closeInstance();
+ PixelsSinkConfigFactory.reset();
+ }
+
+ @Test
+ void shouldPersistBindingAdvanceOffsetAndCommit()
+ {
+ initializeRecoveryConfig(tempDir.resolve("rocksdb"), "recovery", false);
+ StorageSourceOffset beginOffset = new StorageSourceOffset(0, 128L, 0, ProtoType.TRANS);
+ StorageSourceOffset rowOffset = new StorageSourceOffset(0, 256L, 0, ProtoType.ROW);
+ StorageSourceOffset earlierRowOffset = new StorageSourceOffset(0, 192L, 0, ProtoType.ROW);
+ RecoveryBinding binding = new RecoveryBinding(
+ "tx-1",
+ 101L,
+ 10001L,
+ 20001L,
+ 30001L,
+ beginOffset,
+ null,
+ RecoveryState.ACTIVE
+ );
+
+ try (RocksDbRecoveryStore store = new RocksDbRecoveryStore())
+ {
+ assertFalse(store.hasRecoveryState());
+
+ store.saveNewBinding(binding);
+ assertTrue(store.hasRecoveryState());
+
+ RecoveryBinding stored = store.getBinding("tx-1").orElseThrow();
+ assertEquals(101L, stored.getPixelsTransId());
+ assertEquals(beginOffset, stored.getBeginOffset());
+ assertEquals(RecoveryState.ACTIVE, stored.getState());
+
+ List activeBindings = store.loadActiveBindings();
+ assertEquals(1, activeBindings.size());
+ assertEquals("tx-1", activeBindings.get(0).getDataSourceTxId());
+
+ store.saveTimestampReplayIndex(new TimestampReplayIndexEntry(10001L, "tx-1", 101L, beginOffset));
+ assertEquals(beginOffset, store.findReplayIndexAtOrBefore(10001L).orElseThrow().getSourceOffset());
+ assertEquals(beginOffset, store.findReplayIndexAtOrBefore(10002L).orElseThrow().getSourceOffset());
+
+ store.advanceLastSafeOffset("tx-1", rowOffset);
+ RecoveryBinding advanced = store.getBinding("tx-1").orElseThrow();
+ assertEquals(rowOffset, advanced.getLastSafeOffset());
+
+ store.advanceLastSafeOffset("tx-1", earlierRowOffset);
+ RecoveryBinding regressed = store.getBinding("tx-1").orElseThrow();
+ assertEquals(earlierRowOffset, regressed.getLastSafeOffset());
+
+ store.markCommitting("tx-1");
+ RecoveryBinding committing = store.getBinding("tx-1").orElseThrow();
+ assertEquals(RecoveryState.COMMITTING, committing.getState());
+
+ store.markCommitted("tx-1");
+ assertTrue(store.hasCommitMarker("tx-1"));
+ RecoveryBinding committed = store.getBinding("tx-1").orElseThrow();
+ assertEquals(RecoveryState.COMMITTED, committed.getState());
+ assertTrue(store.loadActiveBindings().isEmpty());
+ }
+ }
+
+ @Test
+ void shouldResetAllRecoveryState()
+ {
+ initializeRecoveryConfig(tempDir.resolve("rocksdb"), "bootstrap", false);
+ RecoveryBinding binding = new RecoveryBinding(
+ "tx-2",
+ 202L,
+ 20002L,
+ 21000L,
+ 31000L,
+ new StorageSourceOffset(1, 64L, 0, ProtoType.TRANS),
+ null,
+ RecoveryState.ACTIVE
+ );
+
+ try (RocksDbRecoveryStore store = new RocksDbRecoveryStore())
+ {
+ store.saveNewBinding(binding);
+ assertTrue(store.hasRecoveryState());
+
+ store.reset();
+ assertFalse(store.hasRecoveryState());
+ assertTrue(store.getBinding("tx-2").isEmpty());
+ assertFalse(store.hasCommitMarker("tx-2"));
+ }
+ }
+
+ private void initializeRecoveryConfig(Path rocksdbDir, String mode, boolean forceOverwrite)
+ {
+ ConfigFactory configFactory = ConfigFactory.Instance();
+ configFactory.addProperty("sink.datasource", "storage");
+ configFactory.addProperty("sink.mode", "retina");
+ configFactory.addProperty("sink.trans.mode", "batch");
+ configFactory.addProperty("sink.recovery.enable", "true");
+ configFactory.addProperty("sink.recovery.mode", mode);
+ configFactory.addProperty("sink.recovery.bootstrap.force_overwrite", Boolean.toString(forceOverwrite));
+ configFactory.addProperty("sink.recovery.rocksdb.dir", rocksdbDir.toString());
+ configFactory.addProperty("sink.recovery.insert_as_update", "true");
+ PixelsSinkConfigFactory.initialize(configFactory);
+ }
+}