Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions conf/pixels-sink.it.local.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Source
sink.datasource=storage
sink.datasource.rate.limit=-1
sink.datasource.rate.limit.type=guava
sink.storage.loop=false

# Sink
sink.mode=retina
sink.trans.mode=batch
sink.commit.method=sync
sink.commit.batch.size=50
sink.commit.batch.worker=2
sink.commit.batch.delay=50

# Retina
sink.retina.embedded=false
sink.retina.mode=stream
sink.retina.client=1
sink.retina.log.queue=false
sink.retina.rpc.limit=1000
sink.retina.trans.limit=1000
sink.retina.trans.request.batch=false
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=100
sink.flush.batch.size=2
sink.max.retries=3

# Proto storage source
sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
sink.proto.data=ray-recovery-it
sink.proto.maxRecords=100000

# Recovery
sink.recovery.enable=true
sink.recovery.mode=bootstrap
sink.recovery.bootstrap.force_overwrite=true
sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-rocksdb
sink.recovery.insert_as_update=true

# Trino
trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
trino.user=pixels
trino.password=
trino.parallel=1

# Freshness / monitor
sink.monitor.enable=false
sink.monitor.report.enable=false
sink.monitor.freshness.level=row
sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv

# Other
sink.rpc.enable=false
sink.rpc.mock.delay=0
sink.registry.url=http://localhost:8080/apis/registry/v2
transaction.topic.suffix=transaction
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
60 changes: 60 additions & 0 deletions conf/pixels-sink.it.recovery-window.bootstrap.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Source
sink.datasource=storage
sink.datasource.rate.limit=-1
sink.datasource.rate.limit.type=guava
sink.storage.loop=false

# Sink
sink.mode=retina
sink.trans.mode=batch
sink.commit.method=sync
sink.commit.batch.size=50
sink.commit.batch.worker=2
sink.commit.batch.delay=50

# Retina
sink.retina.embedded=false
sink.retina.mode=stream
sink.retina.client=1
sink.retina.log.queue=false
sink.retina.rpc.limit=1000
sink.retina.trans.limit=1000
sink.retina.trans.request.batch=false
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=100
sink.flush.batch.size=2
sink.max.retries=3

# Proto storage source
sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
sink.proto.data=ray-recovery-window-it-v2
sink.proto.maxRecords=100000

# Recovery
sink.recovery.enable=true
sink.recovery.mode=bootstrap
sink.recovery.bootstrap.force_overwrite=true
sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-window-rocksdb-v2
sink.recovery.insert_as_update=true

# Trino
trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
trino.user=pixels
trino.password=
trino.parallel=1

# Freshness / monitor
sink.monitor.enable=false
sink.monitor.report.enable=false
sink.monitor.freshness.level=row
sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv

# Other
sink.rpc.enable=false
sink.rpc.mock.delay=0
sink.registry.url=http://localhost:8080/apis/registry/v2
transaction.topic.suffix=transaction
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
60 changes: 60 additions & 0 deletions conf/pixels-sink.it.recovery-window.recovery.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Source
sink.datasource=storage
sink.datasource.rate.limit=-1
sink.datasource.rate.limit.type=guava
sink.storage.loop=false

# Sink
sink.mode=retina
sink.trans.mode=batch
sink.commit.method=sync
sink.commit.batch.size=50
sink.commit.batch.worker=2
sink.commit.batch.delay=50

# Retina
sink.retina.embedded=false
sink.retina.mode=stream
sink.retina.client=1
sink.retina.log.queue=false
sink.retina.rpc.limit=1000
sink.retina.trans.limit=1000
sink.retina.trans.request.batch=false
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=100
sink.flush.batch.size=2
sink.max.retries=3

# Proto storage source
sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
sink.proto.data=ray-recovery-window-it-v2
sink.proto.maxRecords=100000

# Recovery
sink.recovery.enable=true
sink.recovery.mode=recovery
sink.recovery.bootstrap.force_overwrite=false
sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-window-rocksdb-v2
sink.recovery.insert_as_update=true

# Trino
trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
trino.user=pixels
trino.password=
trino.parallel=1

# Freshness / monitor
sink.monitor.enable=false
sink.monitor.report.enable=false
sink.monitor.freshness.level=row
sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv

# Other
sink.rpc.enable=false
sink.rpc.mock.delay=0
sink.registry.url=http://localhost:8080/apis/registry/v2
transaction.topic.suffix=transaction
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
60 changes: 60 additions & 0 deletions conf/pixels-sink.it.recovery.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Source
sink.datasource=storage
sink.datasource.rate.limit=-1
sink.datasource.rate.limit.type=guava
sink.storage.loop=false

# Sink
sink.mode=retina
sink.trans.mode=batch
sink.commit.method=sync
sink.commit.batch.size=50
sink.commit.batch.worker=2
sink.commit.batch.delay=50

# Retina
sink.retina.embedded=false
sink.retina.mode=stream
sink.retina.client=1
sink.retina.log.queue=false
sink.retina.rpc.limit=1000
sink.retina.trans.limit=1000
sink.retina.trans.request.batch=false
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=100
sink.flush.batch.size=2
sink.max.retries=3

# Proto storage source
sink.proto.dir=file:///home/antio2/projects/pixels-sink/target/integration-tests/proto-cdc
sink.proto.data=ray-recovery-it
sink.proto.maxRecords=100000

# Recovery
sink.recovery.enable=true
sink.recovery.mode=recovery
sink.recovery.bootstrap.force_overwrite=false
sink.recovery.rocksdb.dir=/home/antio2/projects/pixels-sink/target/integration-tests/recovery-rocksdb
sink.recovery.insert_as_update=true

# Trino
trino.url=jdbc:trino://localhost:18080/pixels/pixels_test
trino.user=pixels
trino.password=
trino.parallel=1

# Freshness / monitor
sink.monitor.enable=false
sink.monitor.report.enable=false
sink.monitor.freshness.level=row
sink.monitor.report.file=/tmp/pixels-sink-it-rate.csv
sink.monitor.freshness.file=/tmp/pixels-sink-it-fresh.csv

# Other
sink.rpc.enable=false
sink.rpc.mock.delay=0
sink.registry.url=http://localhost:8080/apis/registry/v2
transaction.topic.suffix=transaction
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
39 changes: 39 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,45 @@ Notes on `sink.trans.mode`:
| `sink.proto.maxRecords` | `100000` | Max records per file. |
| `sink.storage.loop` | `false` | Whether to loop over stored files. |

### Recovery

Recovery is currently implemented for:

- `sink.datasource=storage`
- `sink.mode=retina`
- `sink.trans.mode=batch`

The current recovery path is designed around the `TableCrossTxWriter` write path.

| Key | Default | Notes |
| --- | --- | --- |
| `sink.recovery.enable` | `false` | Enable local sink recovery metadata and recovery-aware processing. |
| `sink.recovery.mode` | `bootstrap` | Startup mode: `bootstrap` or `recovery`. |
| `sink.recovery.bootstrap.force_overwrite` | `false` | Only valid in `bootstrap` mode. Clears existing recovery state before starting from the beginning. |
| `sink.recovery.dir` | `./sink-recovery` | Reserved top-level recovery directory. Current implementation mainly uses `sink.recovery.rocksdb.dir`. |
| `sink.recovery.rocksdb.dir` | `./sink-recovery/rocksdb` | RocksDB directory for transaction bindings, active transaction order, and commit markers. |
| `sink.recovery.insert_as_update` | `true` | In recovery mode, rewrite `INSERT` as `UPDATE` with `before = after` to support idempotent replay. |
| `sink.recovery.fail_on_corruption` | `true` | Reserved for stricter recovery-store corruption handling. Not fully wired yet. |

### Notes on recovery mode

- `bootstrap` starts reading from the beginning of the storage source.
- `recovery` loads recovery metadata and replays from the earliest active transaction `beginOffset`.
- In `bootstrap` mode, if recovery state already exists and `sink.recovery.bootstrap.force_overwrite=false`, startup fails.
- In `bootstrap` mode, if `sink.recovery.bootstrap.force_overwrite=true`, the existing recovery store is cleared before startup.
- `sink.recovery.bootstrap.force_overwrite` must not be used with `sink.recovery.mode=recovery`.
- Recovery currently assumes that `TransServer` can return the original transaction context via `getTransContext(pixelsTransId)`.

### Notes on recovery metadata

The recovery store keeps:

- `dataSourceTxId -> pixelsTransId / timestamp / lease / beginOffset / lastSafeOffset / state`
- an ordered view of active transactions by `beginOffset`
- commit markers used to suppress duplicate commits

`lastSafeOffset` is advanced on successful `TableCrossTxWriter` flush or batch completion, not on every row event.

### Flink Sink

| Key | Default | Notes |
Expand Down
Loading