Skip to content

jimmy-fb/firepipe

Repository files navigation

FirePipe - S3 → Firebolt Ingestion (Demo + Snowpipe+)

FirePipe provides two deployment options:

  • Demo stack: minimal S3 event → Lambda → Firebolt staging table.
  • Snowpipe+ stack: production-focused features (routing, per-file logs, file-lock to avoid duplicates, and typed upsert for public.employee).

Both stacks deploy as separate AWS CDK stacks and can be used independently.

Customer Quick Start

customer only needs to provide credentials, a bucket, and a prefix; ingestion happens automatically when files land in S3.

  1. Configure (.env)
FIREBOLT_ACCOUNT=your_account
FIREBOLT_ENGINE=your_engine
FIREBOLT_DATABASE=your_db
# Choose ONE auth method
FIREBOLT_CLIENT_ID=...
FIREBOLT_CLIENT_SECRET=...
# or
FIREBOLT_USERNAME=...
FIREBOLT_PASSWORD=...

AWS_REGION=ap-south-1
S3_BUCKET=your-existing-bucket
S3_PLUS_PREFIX=plus/
SCHEMA_NAME=public
  1. Deploy Snowpipe+ (single command)
export AWS_REGION=ap-south-1 AWS_DEFAULT_REGION=ap-south-1
export S3_BUCKET=<your-bucket> S3_PLUS_PREFIX=plus/
cdk bootstrap
cdk deploy FirePipePlusStack --app "python3 snowpipe_plus/app_plus.py" --require-approval never
  1. Route S3 prefix to a table (edit snowpipe_plus/routing_config.json)
{
  "routes": [
    { "prefix": "plus/yourtable/", "schema": "public", "table": "incremental_data", "transform": { "flatten": false } }
  ]
}
  1. Use it
  • Drop Parquet files under plus/yourtable/ in your bucket.
  • The Lambda ingests automatically. Verify in Firebolt:
    • Staging rows: SELECT COUNT(*) FROM public.incremental_data WHERE file_path='<your key>';
    • Load logs: SELECT * FROM public.load_log_plus WHERE file_path='<your key>' ORDER BY processed_timestamp DESC;

Prerequisites

  • AWS account with permissions (S3, Lambda, CloudWatch, ECR, EventBridge, IAM)
  • Firebolt account (engine running), database, and credentials
  • Python 3.9+, AWS CLI, Node.js, and CDK (npm install -g aws-cdk)

Configure (.env)

Copy env.example to .env and set values (quotes not required):

# Firebolt
FIREBOLT_ACCOUNT=your_account
FIREBOLT_ENGINE=your_engine
FIREBOLT_DATABASE=your_database
# Choose ONE auth method
FIREBOLT_CLIENT_ID=...
FIREBOLT_CLIENT_SECRET=...
# or
FIREBOLT_USERNAME=...
FIREBOLT_PASSWORD=...

# AWS
AWS_REGION=ap-south-1
S3_BUCKET=your-existing-bucket

# Snowpipe+
S3_PLUS_PREFIX=plus/
SCHEMA_NAME=public
DEFAULT_TABLE_NAME=incremental_data
LOAD_LOG_TABLE=load_log_plus
USE_FIREBOLT_LOCK=true
PROCESSED_TABLE=processed_files_plus

Deploy

Export region and bucket once in your shell:

export AWS_REGION=ap-south-1 AWS_DEFAULT_REGION=ap-south-1
export S3_BUCKET=<your-bucket>
  • Demo stack (simple ingestion to public.incremental_data):
cdk bootstrap
cdk deploy FirePipeStack --require-approval never
  • Snowpipe+ stack (routing, logs, file-lock, typed upsert):
cdk deploy FirePipePlusStack --app "python3 snowpipe_plus/app_plus.py" --require-approval never

Notes:

  • Both stacks use an existing S3 bucket (S3_BUCKET).
  • Snowpipe+ listens to S3 ObjectCreated events via EventBridge and then filters by prefix and routing inside the Lambda.

Customer Usage (no dummy data)

  • Ensure your Firebolt engine is running.
  • Place Parquet files in S3 under your desired prefixes.
  • Demo stack writes rows into public.incremental_data with metadata columns.
  • Snowpipe+ routes keys based on snowpipe_plus/routing_config.json and logs status to public.load_log_plus.
    • plus/incremental_data/public.incremental_data (staging only)
    • plus/employee/ → stages to public.incremental_data and upserts into public.employee by emp_id (delete+insert semantics to avoid duplicates).
  • FirePipe+ also writes a lock row to public.processed_files_plus (file_path, etag) to avoid double-processing of the same object version.

Optional: Manual Test (Snowpipe+)

  • Create and upload a test Parquet file, force invoke Lambda, then view logs.
FUNCTION=$(jq -r .FirePipePlusStack.PlusLambdaFunctionName cdk-outputs.json 2>/dev/null || echo "FirePipePlusStack-FirePipePlusLambda73DC29F5-eq0JABytMNG7")
BUCKET=$S3_BUCKET
KEY=plus/employee/emp_$(date +%s)_demo.parquet

python3 - << 'PY'
import os, io, boto3, pyarrow as pa, pyarrow.parquet as pq
rows=[{"emp_id":9001,"name":"Demo","dept":"IT","salary":100000,"updated_at":"2025-10-13T13:00:00Z"}]
buf=io.BytesIO(); pq.write_table(pa.Table.from_pylist(rows),buf); buf.seek(0)
b=os.environ['BUCKET']; k=os.environ['KEY']
boto3.client('s3',region_name=os.environ.get('AWS_REGION','ap-south-1')).put_object(Bucket=b, Key=k, Body=buf.getvalue(), ContentType='application/octet-stream')
print('Uploaded', k)
PY

aws lambda invoke --region $AWS_REGION --cli-binary-format raw-in-base64-out \
  --function-name $FUNCTION \
  --payload "{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"$BUCKET\"},\"object\":{\"key\":\"$KEY\"}}}]}" \
  /dev/stdout > /dev/null

aws logs tail /aws/lambda/$FUNCTION --since 2m --region $AWS_REGION

Verify in Firebolt (run in Firebolt SQL):

-- Staging rows for this file
SELECT COUNT(*) FROM public.incremental_data WHERE file_path='${KEY}';

-- Upserted target rows (employee example)
SELECT emp_id, name, dept, salary, updated_at
FROM public.employee
WHERE emp_id IN (9001)
ORDER BY emp_id;

-- Load logs
SELECT status, num_rows, file_path, processed_timestamp
FROM public.load_log_plus
WHERE file_path='${KEY}'
ORDER BY processed_timestamp DESC;

Employee Table (created automatically if missing)

CREATE TABLE IF NOT EXISTS public.employee (
  emp_id INT,
  name TEXT,
  dept TEXT,
  salary INT,
  updated_at TEXT
);

Routing (Snowpipe+)

See snowpipe_plus/routing_config.json. Example routes:

{
  "routes": [
    { "prefix": "plus/incremental_data/", "schema": "public", "table": "incremental_data", "transform": { "flatten": false } },
    { "prefix": "plus/employee/", "schema": "public", "table": "incremental_data", "transform": { "flatten": false }, "upsert": { "table": "employee" } }
  ]
}

Security

  • Provide Firebolt credentials via environment variables (prefer Secrets Manager in production)
  • IAM policies scoped to required S3 and optional DynamoDB access

Publish to GitHub

git init
git add .
git commit -m "FirePipe demo + Snowpipe+ with dedup/upsert and docs"
# replace with your repo
git remote add origin https://github.com/<your-org>/firepipe.git
git branch -M main
git push -u origin main

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published