-
Notifications
You must be signed in to change notification settings - Fork 37
Description
app.py
import os
import sqlite3
import uuid
import time
from functools import wraps
from flask import Flask, request, jsonify, g, abort
import stripe
--- CONFIG ---
DATABASE = os.getenv("DB_PATH", "payouts.db")
stripe.api_key = os.getenv("STRIPE_SECRET_KEY") # use test key in dev
ADMIN_KEYS = set(os.getenv("ADMIN_KEYS", "").split(",")) # comma-separated admin keys
DAILY_LIMIT_CENTS = int(os.getenv("DAILY_PAYOUT_LIMIT_CENTS", "500000000")) # default example $5,000,000
MIN_APPROVALS = 2 # two-person approval
if not stripe.api_key:
raise RuntimeError("STRIPE_SECRET_KEY must be set")
app = Flask(name)
--- DB helpers ---
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
def init_db():
db = get_db()
db.executescript("""
CREATE TABLE IF NOT EXISTS payout_requests (
id TEXT PRIMARY KEY,
created_at INTEGER,
created_by TEXT,
connected_account TEXT,
amount_cents INTEGER,
currency TEXT,
note TEXT,
status TEXT,
transfer_id TEXT
);
CREATE TABLE IF NOT EXISTS approvals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payout_id TEXT,
admin_key TEXT,
approved_at INTEGER
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event TEXT,
data TEXT,
created_at INTEGER
);
""")
db.commit()
@app.before_first_request
def setup():
init_db()
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
def log_event(event, data=""):
db = get_db()
db.execute("INSERT INTO audit_log (event, data, created_at) VALUES (?, ?, ?)", (event, data, int(time.time())))
db.commit()
--- auth decorator ---
def require_admin(f):
@wraps(f)
def decorated(*args, **kwargs):
key = request.headers.get("X-ADMIN-KEY")
if not key or key not in ADMIN_KEYS:
abort(403)
# attach key to request context
request.admin_key = key
return f(*args, **kwargs)
return decorated
--- Helper: check platform balance available (simple) ---
def get_platform_available_balance_cents():
bal = stripe.Balance.retrieve()
# available is a list; choose first currency match (usd)
for a in bal.available:
if a.currency == "usd":
return int(a.amount) # cents
# fallback: sum amounts
return sum(int(a.amount) for a in bal.available)
--- Endpoints ---
@app.route("/payout/request", methods=["POST"])
@require_admin
def create_request():
"""
Create a payout request.
Body: {"connected_account": "acct_...", "amount_cents": 1000000, "currency": "usd", "note": "..."}
"""
body = request.json or {}
connected = body.get("connected_account")
amount = int(body.get("amount_cents", 0))
currency = (body.get("currency") or "usd").lower()
note = body.get("note", "")
if not connected or amount <= 0:
return jsonify({"error": "missing params"}), 400
if amount > DAILY_LIMIT_CENTS:
return jsonify({"error": "amount exceeds configured daily limit"}), 403
pid = str(uuid.uuid4())
now = int(time.time())
db = get_db()
db.execute(
"INSERT INTO payout_requests (id, created_at, created_by, connected_account, amount_cents, currency, note, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(pid, now, request.admin_key, connected, amount, currency, note, "pending")
)
db.commit()
log_event("request_created", f"{pid}|{request.admin_key}|{connected}|{amount}")
return jsonify({"payout_id": pid, "status": "pending"}), 201
@app.route("/payout/<payout_id>/approve", methods=["POST"])
@require_admin
def approve_request(payout_id):
"""
Admin approves a payout. Requires MIN_APPROVALS unique admin approvals to execute.
"""
db = get_db()
# check payout exists and status pending
r = db.execute("SELECT * FROM payout_requests WHERE id = ?", (payout_id,)).fetchone()
if not r:
return jsonify({"error": "not found"}), 404
if r["status"] != "pending":
return jsonify({"error": f"cannot approve in status {r['status']}"}), 400
# check if this admin already approved
existing = db.execute("SELECT * FROM approvals WHERE payout_id = ? AND admin_key = ?", (payout_id, request.admin_key)).fetchone()
if existing:
return jsonify({"error": "already approved by this admin"}), 400
now = int(time.time())
db.execute("INSERT INTO approvals (payout_id, admin_key, approved_at) VALUES (?, ?, ?)", (payout_id, request.admin_key, now))
db.commit()
log_event("approval_added", f"{payout_id}|{request.admin_key}")
# count approvals
cnt = db.execute("SELECT COUNT(*) as c FROM approvals WHERE payout_id = ?", (payout_id,)).fetchone()["c"]
if cnt >= MIN_APPROVALS:
# execute transfer (but perform balance check first)
amount = int(r["amount_cents"])
try:
available = get_platform_available_balance_cents()
except Exception as e:
log_event("balance_check_failed", str(e))
return jsonify({"error": "balance check failed"}), 500
if amount > available:
log_event("insufficient_balance", f"{payout_id}|needed:{amount}|available:{available}")
return jsonify({"error": "insufficient platform balance"}), 400
# Execute Stripe Transfer
try:
transfer = stripe.Transfer.create(
amount=amount,
currency=r["currency"],
destination=r["connected_account"],
description=f"Payout {payout_id}"
)
# update payout record
db.execute("UPDATE payout_requests SET status = ?, transfer_id = ? WHERE id = ?", ("executed", transfer.id, payout_id))
db.commit()
log_event("transfer_executed", f"{payout_id}|{transfer.id}")
return jsonify({"status": "executed", "transfer_id": transfer.id})
except stripe.error.StripeError as e:
# record failure; keep status pending or mark failed per your policy
err = str(e.user_message or e)
log_event("transfer_failed", f"{payout_id}|{err}")
db.execute("UPDATE payout_requests SET status = ? WHERE id = ?", ("failed",))
db.commit()
return jsonify({"error": "transfer failed", "details": err}), 500
return jsonify({"status": "pending", "approvals": cnt})
@app.route("/payout/<payout_id>/status", methods=["GET"])
@require_admin
def get_status(payout_id):
db = get_db()
r = db.execute("SELECT * FROM payout_requests WHERE id = ?", (payout_id,)).fetchone()
if not r:
return jsonify({"error": "not found"}), 404
approvals = db.execute("SELECT admin_key, approved_at FROM approvals WHERE payout_id = ?", (payout_id,)).fetchall()
return jsonify({
"payout": dict(r),
"approvals": [dict(a) for a in approvals]
})
@app.route("/audit/logs", methods=["GET"])
@require_admin
def get_logs():
db = get_db()
rows = db.execute("SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 200").fetchall()
return jsonify([dict(r) for r in rows])
if name == "main":
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "5000")), debug=False)