Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tools/fastswap-miles/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,7 @@ go run ./tools/fastswap-miles/ \
- **Caught-up guard**: Miles are only processed after the indexer has caught up to the chain tip, avoiding excessive Barter API calls during historical backfill.
- **Graceful shutdown**: Catches SIGINT/SIGTERM, finishes current batch, then exits.
- **Idempotent**: Re-running from the same start block is safe — inserts use `INSERT INTO` with primary key dedup.
- **Fuel-submission idempotency** (three-layer defense built in response to the 2026-04-16 double-credit incident, in which an operator re-used the contract deployment block as `-start-block` on a pod restart, causing the indexer to re-walk all history):
1. `insertEvent` checks tx_hash existence before INSERT, skipping re-inserts. The `fastswap_miles` table uses StarRocks `PRIMARY KEY(tx_hash)` — an unconditional INSERT upserts the row and wipes every column not specified (including `processed` and `miles`). Without this check, any block rescan — whether from an explicit `-start-block` flag, a manual reset, or any other trigger — destroys already-processed rows and causes mass re-submission to Fuel. This is the primary fix: re-walking history is now idempotent.
2. The miles processing loops (`processMiles`, `processERC20Miles`) read the `miles` column on every pending row and skip the `submitToFuel` call when it is non-null. `miles` is only ever written after a row's outcome is settled (Fuel submitted, or a terminal no-credit path), so a non-null value means we must not submit again — even if `processed` was flipped back to false by a manual SQL reset. No new column is required; the existing `miles` column doubles as the marker.
3. `saveLastBlock` issues a single atomic INSERT (fastswap_miles_meta has PRIMARY KEY(k) so INSERT upserts). The prior DELETE-then-INSERT pattern could vanish the `last_block` row if the pod died between the two statements. Hardening rather than incident root cause, but worth fixing.
38 changes: 36 additions & 2 deletions tools/fastswap-miles/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,29 @@ func loadLastBlock(db *sql.DB) uint64 {
return v
}

// saveLastBlock persists the indexer's progress marker. The prior
// DELETE-then-INSERT implementation was not atomic: a pod crash, SIGTERM
// during rolling deploy, or transient DB error between the two statements
// could vanish the `last_block` row. On the next startup loadLastBlock would
// return 0, startBlock would fall back to the contract deployment block,
// and the indexer would re-walk all history — re-inserting every event and
// (before the insertEvent existence guard landed) wiping processed=true on
// every row, causing mass re-submission to Fuel. This was the underlying
// trigger for the 2026-04-16 double-credit incident.
//
// fastswap_miles_meta has PRIMARY KEY(k), so a plain INSERT is an atomic
// upsert under StarRocks PK semantics. The DELETE is unnecessary and unsafe.
func saveLastBlock(db *sql.DB, block uint64) {
_, _ = db.Exec(`DELETE FROM mevcommit_57173.fastswap_miles_meta WHERE k = 'last_block'`)
_, err := db.Exec(`INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)`, fmt.Sprintf("%d", block))
if err != nil {
log.Printf("saveLastBlock: %v", err)
}
}

// markProcessed sets processed=true and populates the derived columns. The
// `miles` column doubles as our submitted-to-Fuel marker — once it's non-null,
// the pipeline must never re-submit (idempotency check lives in processMiles /
// processERC20Miles).
func markProcessed(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) {
_, err := db.Exec(`
UPDATE mevcommit_57173.fastswap_miles
Expand All @@ -381,6 +396,19 @@ WHERE tx_hash = ?`,
}
}

// markProcessedFlagOnly flips only the `processed` column to true and touches
// nothing else. It's used by the ERC20 idempotency path: when miles is already
// recorded but processed got reset to false, we must NOT recompute the derived
// columns (surplus_eth/net_profit_eth/bid_cost depend on the sweep result,
// which we can't reproduce without re-sweeping). Just take the row out of the
// pending queue.
func markProcessedFlagOnly(db *sql.DB, txHash string) {
_, err := db.Exec(`UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?`, txHash)
if err != nil {
log.Printf("markProcessedFlagOnly %s: %v", txHash, err)
}
}

// -------------------- Barter API --------------------

type BarterResponse struct {
Expand Down Expand Up @@ -459,12 +487,18 @@ func submitToFuel(
txHash common.Hash,
miles *big.Int,
) error {
// dedup_id makes the submission idempotent on Fuul's side: if we ever send
// the same transaction twice (e.g. our service crashes between this call
// succeeding and markProcessed running), Fuul drops the duplicate instead
// of re-crediting the user. Belt-and-suspenders alongside our own
// miles-non-null idempotency check.
body := map[string]any{
"user": map[string]any{
"identifier_type": "evm_address",
"identifier": user.Hex(),
},
"name": "fast-swap-surplus",
"name": "fast-swap-surplus",
"dedup_id": txHash.Hex(),
"args": map[string]any{
"value": map[string]any{
"amount": miles.String(),
Expand Down
85 changes: 85 additions & 0 deletions tools/fastswap-miles/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,91 @@ import (
"math/big"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/ethereum/go-ethereum/common"
)

// TestMarkProcessed_WritesAllDerivedColumns verifies that markProcessed
// updates every derived column (surplus_eth, net_profit_eth, miles, bid_cost)
// and flips processed=true. Writing miles is what arms the idempotency check:
// once miles is non-null, processMiles / processERC20Miles will skip the
// submitToFuel call on any subsequent run (even if processed gets reset).
func TestMarkProcessed_WritesAllDerivedColumns(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer func() { _ = db.Close() }()

mock.ExpectExec(regexp.QuoteMeta(
"UPDATE mevcommit_57173.fastswap_miles\n"+
"SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true\n"+
"WHERE tx_hash = ?",
)).WithArgs(0.01, 0.005, int64(7), "1000", "0xdead").
WillReturnResult(sqlmock.NewResult(0, 1))

markProcessed(db, "0xdead", 0.01, 0.005, 7, "1000")

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("sqlmock expectations: %v", err)
}
}

// TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn verifies that
// markProcessedFlagOnly (used in the ERC20 idempotency path when a row's
// miles are already recorded but processed got reset to false) issues an
// UPDATE that only mentions `processed = true` and leaves every other column
// untouched. Touching surplus_eth/net_profit_eth/bid_cost here would overwrite
// the values that were derived from the original sweep — values we cannot
// reproduce without re-sweeping.
func TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer func() { _ = db.Close() }()

mock.ExpectExec(regexp.QuoteMeta(
"UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?",
)).WithArgs("0xdead").WillReturnResult(sqlmock.NewResult(0, 1))

markProcessedFlagOnly(db, "0xdead")

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("sqlmock expectations: %v", err)
}
}

// TestSaveLastBlock_IsAtomicInsert verifies that saveLastBlock issues a
// single INSERT (which the fastswap_miles_meta PRIMARY KEY table upserts
// atomically) and NOT the old non-atomic DELETE-then-INSERT pattern. The
// old pattern could leave last_block vanished if the pod was killed between
// the two statements — on next startup the indexer would fall back to the
// deployment block and re-scan all history, which combined with the
// insertEvent upsert bug caused the 2026-04-16 double-credit incident.
func TestSaveLastBlock_IsAtomicInsert(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer func() { _ = db.Close() }()

// Exactly one INSERT expected, no DELETE.
mock.ExpectExec(regexp.QuoteMeta(
"INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)",
)).WithArgs("12345").WillReturnResult(sqlmock.NewResult(0, 1))

saveLastBlock(db, 12345)

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("sqlmock expectations: %v", err)
}
}

func TestDecideBidCheckOutcome(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -240,6 +319,12 @@ func TestSubmitToFuel(t *testing.T) {
t.Errorf("expected name=fast-swap-surplus, got %v", req["name"])
}

// dedup_id must equal the tx hash so Fuul can drop duplicate submits.
txHashHex := common.HexToHash("0xabc").Hex()
if req["dedup_id"] != txHashHex {
t.Errorf("expected dedup_id=%s, got %v", txHashHex, req["dedup_id"])
}

args := req["args"].(map[string]any)
val := args["value"].(map[string]any)
if val["amount"] != "150" {
Expand Down
50 changes: 46 additions & 4 deletions tools/fastswap-miles/miles.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type ethRow struct {
gasCost sql.NullString
inputToken string
blockTS sql.NullTime
miles sql.NullInt64
}

type erc20Row struct {
Expand All @@ -107,6 +108,7 @@ type erc20Row struct {
gasCost sql.NullString
inputToken string
blockTS sql.NullTime
miles sql.NullInt64
}

type tokenBatch struct {
Expand All @@ -119,7 +121,7 @@ type tokenBatch struct {

func processMiles(ctx context.Context, cfg *serviceConfig) (int, error) {
rows, err := cfg.DB.QueryContext(ctx, `
SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp
SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp, miles
FROM mevcommit_57173.fastswap_miles
WHERE processed = false
AND swap_type = 'eth_weth'
Expand All @@ -133,7 +135,7 @@ WHERE processed = false
var pending []ethRow
for rows.Next() {
var r ethRow
if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil {
if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil {
return 0, err
}
pending = append(pending, r)
Expand Down Expand Up @@ -243,6 +245,22 @@ WHERE processed = false
continue
}

// Idempotency guard: if this row already has miles set (even if
// `processed` got flipped back to false by a reset SQL), do NOT
// re-submit to Fuel — just refresh the derived columns. miles is only
// ever written after a successful Fuel submission (or as 0 for the
// no-credit terminal paths), so a non-null value means we already
// settled this row's outcome.
if r.miles.Valid {
cfg.Logger.Info("tx already has miles recorded, skipping re-submission",
slog.String("tx", r.txHash), slog.String("user", r.user),
slog.Int64("recorded_miles", r.miles.Int64),
slog.Int64("recomputed_miles", miles.Int64()))
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, r.miles.Int64, bidCostWei.String())
processed++
continue
}

err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey,
common.HexToAddress(r.user),
common.HexToHash(r.txHash),
Expand All @@ -268,7 +286,7 @@ func processERC20Miles(ctx context.Context, cfg *serviceConfig) (int, error) {
processed := 0

rows, err := cfg.DB.QueryContext(ctx, `
SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp
SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, miles
FROM mevcommit_57173.fastswap_miles
WHERE processed = false
AND swap_type = 'erc20'
Expand All @@ -282,7 +300,7 @@ WHERE processed = false
var pending []erc20Row
for rows.Next() {
var r erc20Row
if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil {
if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil {
return processed, err
}
pending = append(pending, r)
Expand Down Expand Up @@ -331,6 +349,26 @@ WHERE processed = false
var readyBidCosts []*big.Int

for _, r := range batch.Txs {
// Already-settled guard runs BEFORE batch aggregation. If a row's
// surplus tokens were already swept on a prior run (miles is
// non-null), those tokens are no longer in the executor wallet.
// Including the row in readyTxs / readyTotalSum would make the new
// sweep quote for an amount the wallet can't supply, failing the
// batch or skewing pro-rata allocation for every other row. Skip
// entirely: just re-flip processed=true and preserve every other
// column (surplus_eth/net_profit_eth/bid_cost depend on the actual
// sweep result, which we can't recompute without re-sweeping).
if r.miles.Valid {
cfg.Logger.Info("erc20 tx already has miles recorded, excluding from sweep batch",
slog.String("tx", r.txHash), slog.String("user", r.user),
slog.Int64("recorded_miles", r.miles.Int64))
if !cfg.DryRun {
markProcessedFlagOnly(cfg.DB, r.txHash)
}
processed++
continue
}

userPaysGas := strings.EqualFold(r.inputToken, zeroAddr.Hex())
bidCostWei := getBidCost(erc20BidMap, r.txHash)
if bidCostWei.Sign() == 0 {
Expand Down Expand Up @@ -513,6 +551,10 @@ WHERE processed = false
continue
}

// Note: the miles-non-null idempotency check runs upstream, before
// batch aggregation — rows with miles already recorded are never
// in readyTxs here.

err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey,
common.HexToAddress(r.user),
common.HexToHash(r.txHash),
Expand Down
54 changes: 53 additions & 1 deletion tools/fastswap-miles/sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ func insertEvent(
gasCost *big.Int,
swapType string,
) error {
// CRITICAL: the fastswap_miles table uses StarRocks `PRIMARY KEY(tx_hash)`
// model, which means an unconditional INSERT UPSERTS the entire row and
// resets every column we don't specify (processed → false, miles → NULL,
// surplus_eth → NULL, bid_cost → NULL, …).
//
// If the indexer rescans a block (pod restart with last_block reset, an
// explicit -start-block flag going backward, meta row lost during a
// deploy, etc.) an unconditional INSERT would clobber already-processed
// rows back to the pending state — and the miles pipeline would then
// re-submit each one to Fuel, double-crediting users.
//
// The 2026-04-16 double-credit incident was caused by exactly this: the
// Docker-support deploy restarted the pod, the indexer re-walked historical
// blocks, and every re-inserted event wiped `processed` on the existing
// row. 78 events ended up re-submitted to Fuel for a single test user
// alone; the protocol-wide overcount was much larger.
//
var tsVal interface{} = nil
if blockTS != nil {
tsVal = *blockTS
Expand All @@ -140,7 +157,42 @@ func insertEvent(
gcStr = gasCost.String()
}

_, err := db.Exec(`
// Fix: check for existence before inserting. The IntentExecuted event
// args themselves are immutable once L1-finalized, so the row's core
// fields (user, tokens, amounts, surplus) must never be replaced.
//
// For rows that already exist: run a COALESCE-only UPDATE that fills in
// gas_cost or block_timestamp IF they were previously NULL (which happens
// when indexBatch caught a transient receipt/header RPC failure on the
// first pass). This preserves every derived column (processed, miles,
// surplus_eth, net_profit_eth, bid_cost) — so a rescan can heal partial
// metadata without destroying pipeline state.
var exists bool
err := db.QueryRow(
`SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)`,
txHash,
).Scan(&exists)
if err != nil {
return fmt.Errorf("check existing row: %w", err)
}
if exists {
// Backfill only the two fields that can legitimately arrive NULL from
// transient RPC failures. COALESCE(col, newVal) keeps the existing
// value if it's non-NULL, and substitutes newVal otherwise. If newVal
// is also NULL (we still don't have fresh data), the column is
// unchanged — no-op.
_, err := db.Exec(`
UPDATE mevcommit_57173.fastswap_miles
SET gas_cost = COALESCE(gas_cost, ?),
block_timestamp = COALESCE(block_timestamp, ?)
WHERE tx_hash = ?`, gcStr, tsVal, txHash)
if err != nil {
return fmt.Errorf("backfill null metadata: %w", err)
}
return nil
}

_, err = db.Exec(`
INSERT INTO mevcommit_57173.fastswap_miles (
tx_hash, block_number, block_timestamp, user_address,
input_token, output_token, input_amount, user_amt_out,
Expand Down
Loading
Loading