pixels-spark is a Spark CDC and Delta Lake merge/import pipeline for Pixels workloads.
It provides two core capabilities:
- Read CDC records from the Pixels RPC service
- Merge those CDC records into Delta Lake tables using local benchmark table definitions
- Reuse the same benchmark table definitions for both CSV import and CDC merge
- Automatic schema loading from the Pixels metadata service
- Automatic primary-key discovery and in-process metadata caching
- Spark Structured Streaming source based on the Pixels RPC polling protocol
- Delta Lake
MERGEpipeline forINSERT,UPDATE,DELETE, andSNAPSHOT - Default
hard deletebehavior that keeps the target Delta schema aligned with the source schema - Optional
soft deletemode for audit-oriented workflows - Packaged helper scripts for build, submit, validation, and benchmark runs
- Linux or another Unix-like environment
- Java 17
- Maven 3.x
- A Spark 3.5.x distribution with
spark-submit - Access to a running Pixels RPC service
- Access to a running Pixels metadata service
Optional, depending on the experiment:
- MinIO or S3-compatible object storage
- Hive Metastore
- Trino
- Flink
Build the project:
mvn -DskipTests compileBuild a shaded deployment JAR:
./scripts/build-package.shThe packaged artifact is:
target/pixels-spark-0.1.jar
This JAR already includes the required Delta Lake runtime dependencies for the current packaging model. If you submit this artifact with spark-submit, you do not need an extra --packages io.delta:... argument.
mvn -q -DskipTests \
-Dexec.mainClass=io.pixelsdb.spark.app.PixelsCustomerPullTest \
-Dexec.args="localhost 9091 pixels_bench savingaccount 0" \
org.codehaus.mojo:exec-maven-plugin:3.5.0:javaenv JAVA_HOME=/path/to/java17 \
SPARK_LOCAL_IP=127.0.0.1 \
SPARK_LOCAL_HOSTNAME=localhost \
MAVEN_OPTS='--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' \
mvn -q -DskipTests \
-Dexec.classpathScope=compile \
-Dexec.mainClass=io.pixelsdb.spark.app.PixelsSavingAccountStreamTest \
-Dexec.args="localhost 9091 localhost 18888 pixels_bench savingaccount 0" \
org.codehaus.mojo:exec-maven-plugin:3.5.0:javaenv JAVA_HOME=/path/to/java17 \
SPARK_LOCAL_IP=127.0.0.1 \
SPARK_LOCAL_HOSTNAME=localhost \
MAVEN_OPTS='--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' \
mvn -q -DskipTests \
-Dexec.classpathScope=compile \
-Dexec.mainClass=io.pixelsdb.spark.app.PixelsDeltaMergeApp \
-Dexec.args="--spark-master local[1] \
--database pixels_bench \
--table savingaccount \
--rpc-host localhost \
--rpc-port 9091 \
--metadata-host localhost \
--metadata-port 18888 \
--target-path /tmp/pixels-spark-savingaccount-delta \
--checkpoint-location /tmp/pixels-spark-savingaccount-ckpt \
--trigger-mode once" \
org.codehaus.mojo:exec-maven-plugin:3.5.0:javaPreview the table:
./scripts/preview-delta-table.sh /tmp/pixels-spark-savingaccount-delta 5 local[1]Check primary-key uniqueness:
./scripts/check-delta-primary-key.sh \
localhost \
18888 \
pixels_bench \
savingaccount \
/tmp/pixels-spark-savingaccount-delta \
local[1]Expected validation rule:
row_count == distinct_pk_count
Configuration precedence:
- Spark options or CLI arguments
PIXELS_SPARK_CONFIG$PIXELS_HOME/etc/pixels-spark.properties- classpath
pixels-spark.properties ConfigFactory.Instance()values fromPIXELS_CONFIGor$PIXELS_HOME/etc/pixels.properties
Minimal pixels-spark.properties example:
pixels.spark.rpc.host=localhost
pixels.spark.rpc.port=9091
pixels.spark.metadata.host=localhost
pixels.spark.metadata.port=18888
pixels.spark.delta.auto-create=true
pixels.spark.delta.delete.mode=hard
pixels.spark.delta.trigger.mode=once
pixels.spark.delta.trigger.interval=0 secondsDelete mode options:
hard: physically delete matched rows from the Delta tablesoft: add_pixels_is_deletedand_pixels_deleted_atcolumns and mark rows as deletedignore: ignore delete events
Default behavior:
hard delete- target Delta schema remains aligned with the source schema
After packaging, you can submit the shaded JAR directly:
./scripts/run-delta-merge.sh \
--database pixels_bench \
--table savingaccount \
--rpc-host localhost \
--rpc-port 9091 \
--metadata-host localhost \
--metadata-port 18888 \
--target-path /tmp/pixels-spark-savingaccount-delta \
--checkpoint-location /tmp/pixels-spark-savingaccount-ckpt \
--trigger-mode onceBucket selection is automatic. By default, CDC pulls all source buckets defined by node.bucket.num in $PIXELS_HOME/etc/pixels.properties; you do not need to pass --buckets.
The helper scripts look for spark-submit in this order:
SPARK_SUBMIT_BINSPARK_HOME/bin/spark-submitspark-submitfromPATH
English:
- Local Startup Commands
- Delta Lake Import Quickstart
- Native Delta Lake Deployment
- Delta Lake Test Flow
简体中文:
The current implementation has been validated for:
- Pixels schema loading from the metadata service
- Primary-key discovery from the metadata service
- Streaming source reads from the Pixels RPC service
- Delta merge with primary-key uniqueness validation
- Default
hard deletebehavior with source-schema-aligned target tables - Optional
soft deletetable creation path
- The streaming source still uses a synthetic local offset rather than a server-side cursor
- The current merge pipeline uses micro-batch semantics, not Flink-style native changelog tables
- Batch-level deduplication relies on Pixels transaction metadata fields
soft deleteis optional and changes the target table schema by design