diff --git a/tools/fastswap-miles/README.md b/tools/fastswap-miles/README.md index e383f3df5..df5d243d5 100644 --- a/tools/fastswap-miles/README.md +++ b/tools/fastswap-miles/README.md @@ -174,3 +174,7 @@ go run ./tools/fastswap-miles/ \ - **Caught-up guard**: Miles are only processed after the indexer has caught up to the chain tip, avoiding excessive Barter API calls during historical backfill. - **Graceful shutdown**: Catches SIGINT/SIGTERM, finishes current batch, then exits. - **Idempotent**: Re-running from the same start block is safe — inserts use `INSERT INTO` with primary key dedup. +- **Fuel-submission idempotency** (three-layer defense built in response to the 2026-04-16 double-credit incident, in which an operator re-used the contract deployment block as `-start-block` on a pod restart, causing the indexer to re-walk all history): + 1. `insertEvent` checks tx_hash existence before INSERT, skipping re-inserts. The `fastswap_miles` table uses StarRocks `PRIMARY KEY(tx_hash)` — an unconditional INSERT upserts the row and wipes every column not specified (including `processed` and `miles`). Without this check, any block rescan — whether from an explicit `-start-block` flag, a manual reset, or any other trigger — destroys already-processed rows and causes mass re-submission to Fuel. This is the primary fix: re-walking history is now idempotent. + 2. The miles processing loops (`processMiles`, `processERC20Miles`) read the `miles` column on every pending row and skip the `submitToFuel` call when it is non-null. `miles` is only ever written after a row's outcome is settled (Fuel submitted, or a terminal no-credit path), so a non-null value means we must not submit again — even if `processed` was flipped back to false by a manual SQL reset. No new column is required; the existing `miles` column doubles as the marker. + 3. `saveLastBlock` issues a single atomic INSERT (fastswap_miles_meta has PRIMARY KEY(k) so INSERT upserts). The prior DELETE-then-INSERT pattern could vanish the `last_block` row if the pod died between the two statements. Hardening rather than incident root cause, but worth fixing. diff --git a/tools/fastswap-miles/main.go b/tools/fastswap-miles/main.go index ed1e511a4..0a9d15e57 100644 --- a/tools/fastswap-miles/main.go +++ b/tools/fastswap-miles/main.go @@ -362,14 +362,29 @@ func loadLastBlock(db *sql.DB) uint64 { return v } +// saveLastBlock persists the indexer's progress marker. The prior +// DELETE-then-INSERT implementation was not atomic: a pod crash, SIGTERM +// during rolling deploy, or transient DB error between the two statements +// could vanish the `last_block` row. On the next startup loadLastBlock would +// return 0, startBlock would fall back to the contract deployment block, +// and the indexer would re-walk all history — re-inserting every event and +// (before the insertEvent existence guard landed) wiping processed=true on +// every row, causing mass re-submission to Fuel. This was the underlying +// trigger for the 2026-04-16 double-credit incident. +// +// fastswap_miles_meta has PRIMARY KEY(k), so a plain INSERT is an atomic +// upsert under StarRocks PK semantics. The DELETE is unnecessary and unsafe. func saveLastBlock(db *sql.DB, block uint64) { - _, _ = db.Exec(`DELETE FROM mevcommit_57173.fastswap_miles_meta WHERE k = 'last_block'`) _, err := db.Exec(`INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)`, fmt.Sprintf("%d", block)) if err != nil { log.Printf("saveLastBlock: %v", err) } } +// markProcessed sets processed=true and populates the derived columns. The +// `miles` column doubles as our submitted-to-Fuel marker — once it's non-null, +// the pipeline must never re-submit (idempotency check lives in processMiles / +// processERC20Miles). func markProcessed(db *sql.DB, txHash string, surplusEth, netProfitEth float64, miles int64, bidCost string) { _, err := db.Exec(` UPDATE mevcommit_57173.fastswap_miles @@ -381,6 +396,19 @@ WHERE tx_hash = ?`, } } +// markProcessedFlagOnly flips only the `processed` column to true and touches +// nothing else. It's used by the ERC20 idempotency path: when miles is already +// recorded but processed got reset to false, we must NOT recompute the derived +// columns (surplus_eth/net_profit_eth/bid_cost depend on the sweep result, +// which we can't reproduce without re-sweeping). Just take the row out of the +// pending queue. +func markProcessedFlagOnly(db *sql.DB, txHash string) { + _, err := db.Exec(`UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?`, txHash) + if err != nil { + log.Printf("markProcessedFlagOnly %s: %v", txHash, err) + } +} + // -------------------- Barter API -------------------- type BarterResponse struct { @@ -459,12 +487,18 @@ func submitToFuel( txHash common.Hash, miles *big.Int, ) error { + // dedup_id makes the submission idempotent on Fuul's side: if we ever send + // the same transaction twice (e.g. our service crashes between this call + // succeeding and markProcessed running), Fuul drops the duplicate instead + // of re-crediting the user. Belt-and-suspenders alongside our own + // miles-non-null idempotency check. body := map[string]any{ "user": map[string]any{ "identifier_type": "evm_address", "identifier": user.Hex(), }, - "name": "fast-swap-surplus", + "name": "fast-swap-surplus", + "dedup_id": txHash.Hex(), "args": map[string]any{ "value": map[string]any{ "amount": miles.String(), diff --git a/tools/fastswap-miles/main_test.go b/tools/fastswap-miles/main_test.go index 3457a6e79..98b06160d 100644 --- a/tools/fastswap-miles/main_test.go +++ b/tools/fastswap-miles/main_test.go @@ -6,12 +6,91 @@ import ( "math/big" "net/http" "net/http/httptest" + "regexp" "testing" "time" + "github.com/DATA-DOG/go-sqlmock" "github.com/ethereum/go-ethereum/common" ) +// TestMarkProcessed_WritesAllDerivedColumns verifies that markProcessed +// updates every derived column (surplus_eth, net_profit_eth, miles, bid_cost) +// and flips processed=true. Writing miles is what arms the idempotency check: +// once miles is non-null, processMiles / processERC20Miles will skip the +// submitToFuel call on any subsequent run (even if processed gets reset). +func TestMarkProcessed_WritesAllDerivedColumns(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles\n"+ + "SET surplus_eth = ?, net_profit_eth = ?, miles = ?, bid_cost = ?, processed = true\n"+ + "WHERE tx_hash = ?", + )).WithArgs(0.01, 0.005, int64(7), "1000", "0xdead"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + markProcessed(db, "0xdead", 0.01, 0.005, 7, "1000") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn verifies that +// markProcessedFlagOnly (used in the ERC20 idempotency path when a row's +// miles are already recorded but processed got reset to false) issues an +// UPDATE that only mentions `processed = true` and leaves every other column +// untouched. Touching surplus_eth/net_profit_eth/bid_cost here would overwrite +// the values that were derived from the original sweep — values we cannot +// reproduce without re-sweeping. +func TestMarkProcessedFlagOnly_UpdatesOnlyProcessedColumn(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles SET processed = true WHERE tx_hash = ?", + )).WithArgs("0xdead").WillReturnResult(sqlmock.NewResult(0, 1)) + + markProcessedFlagOnly(db, "0xdead") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestSaveLastBlock_IsAtomicInsert verifies that saveLastBlock issues a +// single INSERT (which the fastswap_miles_meta PRIMARY KEY table upserts +// atomically) and NOT the old non-atomic DELETE-then-INSERT pattern. The +// old pattern could leave last_block vanished if the pod was killed between +// the two statements — on next startup the indexer would fall back to the +// deployment block and re-scan all history, which combined with the +// insertEvent upsert bug caused the 2026-04-16 double-credit incident. +func TestSaveLastBlock_IsAtomicInsert(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + // Exactly one INSERT expected, no DELETE. + mock.ExpectExec(regexp.QuoteMeta( + "INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES ('last_block', ?)", + )).WithArgs("12345").WillReturnResult(sqlmock.NewResult(0, 1)) + + saveLastBlock(db, 12345) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + func TestDecideBidCheckOutcome(t *testing.T) { tests := []struct { name string @@ -240,6 +319,12 @@ func TestSubmitToFuel(t *testing.T) { t.Errorf("expected name=fast-swap-surplus, got %v", req["name"]) } + // dedup_id must equal the tx hash so Fuul can drop duplicate submits. + txHashHex := common.HexToHash("0xabc").Hex() + if req["dedup_id"] != txHashHex { + t.Errorf("expected dedup_id=%s, got %v", txHashHex, req["dedup_id"]) + } + args := req["args"].(map[string]any) val := args["value"].(map[string]any) if val["amount"] != "150" { diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index 19651d98e..d63be0af1 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -97,6 +97,7 @@ type ethRow struct { gasCost sql.NullString inputToken string blockTS sql.NullTime + miles sql.NullInt64 } type erc20Row struct { @@ -107,6 +108,7 @@ type erc20Row struct { gasCost sql.NullString inputToken string blockTS sql.NullTime + miles sql.NullInt64 } type tokenBatch struct { @@ -119,7 +121,7 @@ type tokenBatch struct { func processMiles(ctx context.Context, cfg *serviceConfig) (int, error) { rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp +SELECT tx_hash, user_address, surplus, gas_cost, input_token, block_timestamp, miles FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'eth_weth' @@ -133,7 +135,7 @@ WHERE processed = false var pending []ethRow for rows.Next() { var r ethRow - if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil { return 0, err } pending = append(pending, r) @@ -243,6 +245,22 @@ WHERE processed = false continue } + // Idempotency guard: if this row already has miles set (even if + // `processed` got flipped back to false by a reset SQL), do NOT + // re-submit to Fuel — just refresh the derived columns. miles is only + // ever written after a successful Fuel submission (or as 0 for the + // no-credit terminal paths), so a non-null value means we already + // settled this row's outcome. + if r.miles.Valid { + cfg.Logger.Info("tx already has miles recorded, skipping re-submission", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Int64("recorded_miles", r.miles.Int64), + slog.Int64("recomputed_miles", miles.Int64())) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, r.miles.Int64, bidCostWei.String()) + processed++ + continue + } + err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), common.HexToHash(r.txHash), @@ -268,7 +286,7 @@ func processERC20Miles(ctx context.Context, cfg *serviceConfig) (int, error) { processed := 0 rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp +SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, miles FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'erc20' @@ -282,7 +300,7 @@ WHERE processed = false var pending []erc20Row for rows.Next() { var r erc20Row - if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil { return processed, err } pending = append(pending, r) @@ -331,6 +349,26 @@ WHERE processed = false var readyBidCosts []*big.Int for _, r := range batch.Txs { + // Already-settled guard runs BEFORE batch aggregation. If a row's + // surplus tokens were already swept on a prior run (miles is + // non-null), those tokens are no longer in the executor wallet. + // Including the row in readyTxs / readyTotalSum would make the new + // sweep quote for an amount the wallet can't supply, failing the + // batch or skewing pro-rata allocation for every other row. Skip + // entirely: just re-flip processed=true and preserve every other + // column (surplus_eth/net_profit_eth/bid_cost depend on the actual + // sweep result, which we can't recompute without re-sweeping). + if r.miles.Valid { + cfg.Logger.Info("erc20 tx already has miles recorded, excluding from sweep batch", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Int64("recorded_miles", r.miles.Int64)) + if !cfg.DryRun { + markProcessedFlagOnly(cfg.DB, r.txHash) + } + processed++ + continue + } + userPaysGas := strings.EqualFold(r.inputToken, zeroAddr.Hex()) bidCostWei := getBidCost(erc20BidMap, r.txHash) if bidCostWei.Sign() == 0 { @@ -513,6 +551,10 @@ WHERE processed = false continue } + // Note: the miles-non-null idempotency check runs upstream, before + // batch aggregation — rows with miles already recorded are never + // in readyTxs here. + err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, common.HexToAddress(r.user), common.HexToHash(r.txHash), diff --git a/tools/fastswap-miles/sweep.go b/tools/fastswap-miles/sweep.go index be14a48a6..6b126d316 100644 --- a/tools/fastswap-miles/sweep.go +++ b/tools/fastswap-miles/sweep.go @@ -131,6 +131,23 @@ func insertEvent( gasCost *big.Int, swapType string, ) error { + // CRITICAL: the fastswap_miles table uses StarRocks `PRIMARY KEY(tx_hash)` + // model, which means an unconditional INSERT UPSERTS the entire row and + // resets every column we don't specify (processed → false, miles → NULL, + // surplus_eth → NULL, bid_cost → NULL, …). + // + // If the indexer rescans a block (pod restart with last_block reset, an + // explicit -start-block flag going backward, meta row lost during a + // deploy, etc.) an unconditional INSERT would clobber already-processed + // rows back to the pending state — and the miles pipeline would then + // re-submit each one to Fuel, double-crediting users. + // + // The 2026-04-16 double-credit incident was caused by exactly this: the + // Docker-support deploy restarted the pod, the indexer re-walked historical + // blocks, and every re-inserted event wiped `processed` on the existing + // row. 78 events ended up re-submitted to Fuel for a single test user + // alone; the protocol-wide overcount was much larger. + // var tsVal interface{} = nil if blockTS != nil { tsVal = *blockTS @@ -140,7 +157,42 @@ func insertEvent( gcStr = gasCost.String() } - _, err := db.Exec(` + // Fix: check for existence before inserting. The IntentExecuted event + // args themselves are immutable once L1-finalized, so the row's core + // fields (user, tokens, amounts, surplus) must never be replaced. + // + // For rows that already exist: run a COALESCE-only UPDATE that fills in + // gas_cost or block_timestamp IF they were previously NULL (which happens + // when indexBatch caught a transient receipt/header RPC failure on the + // first pass). This preserves every derived column (processed, miles, + // surplus_eth, net_profit_eth, bid_cost) — so a rescan can heal partial + // metadata without destroying pipeline state. + var exists bool + err := db.QueryRow( + `SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)`, + txHash, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("check existing row: %w", err) + } + if exists { + // Backfill only the two fields that can legitimately arrive NULL from + // transient RPC failures. COALESCE(col, newVal) keeps the existing + // value if it's non-NULL, and substitutes newVal otherwise. If newVal + // is also NULL (we still don't have fresh data), the column is + // unchanged — no-op. + _, err := db.Exec(` +UPDATE mevcommit_57173.fastswap_miles +SET gas_cost = COALESCE(gas_cost, ?), + block_timestamp = COALESCE(block_timestamp, ?) +WHERE tx_hash = ?`, gcStr, tsVal, txHash) + if err != nil { + return fmt.Errorf("backfill null metadata: %w", err) + } + return nil + } + + _, err = db.Exec(` INSERT INTO mevcommit_57173.fastswap_miles ( tx_hash, block_number, block_timestamp, user_address, input_token, output_token, input_amount, user_amt_out, diff --git a/tools/fastswap-miles/sweep_test.go b/tools/fastswap-miles/sweep_test.go new file mode 100644 index 000000000..a07eb3bf6 --- /dev/null +++ b/tools/fastswap-miles/sweep_test.go @@ -0,0 +1,133 @@ +package main + +import ( + "math/big" + "regexp" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + fastsettlement "github.com/primev/mev-commit/contracts-abi/clients/FastSettlementV3" +) + +func newTestEvent() *fastsettlement.Fastsettlementv3IntentExecuted { + return &fastsettlement.Fastsettlementv3IntentExecuted{ + User: common.HexToAddress("0xabc"), + InputToken: common.HexToAddress("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"), // USDC + OutputToken: common.HexToAddress("0x0000000000000000000000000000000000000000"), // ETH + InputAmt: big.NewInt(100_000_000), + UserAmtOut: big.NewInt(50_000_000_000_000_000), + Surplus: big.NewInt(500_000_000_000_000), + Raw: types.Log{ + TxHash: common.HexToHash("0xdead"), + BlockNumber: 12345, + }, + } +} + +// TestInsertEvent_BackfillsNullMetadataOnExistingRow verifies the critical +// idempotency guarantee: when a row with this tx_hash already exists in +// fastswap_miles, insertEvent must NOT issue an INSERT (which would UPSERT +// under StarRocks PK semantics, wiping derived columns and causing the +// 2026-04-16 double-credit incident). Instead it issues a COALESCE-only +// UPDATE that fills in NULL gas_cost or block_timestamp from a later rescan +// while preserving every derived column (processed, miles, surplus_eth, +// net_profit_eth, bid_cost). +func TestInsertEvent_BackfillsNullMetadataOnExistingRow(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnRows( + sqlmock.NewRows([]string{"exists"}).AddRow(true), + ) + // COALESCE UPDATE that only touches the two fields that can legitimately + // be NULL from a prior RPC-failed insert. No INSERT must fire. + mock.ExpectExec(regexp.QuoteMeta( + "UPDATE mevcommit_57173.fastswap_miles\n"+ + "SET gas_cost = COALESCE(gas_cost, ?),\n"+ + " block_timestamp = COALESCE(block_timestamp, ?)\n"+ + "WHERE tx_hash = ?", + )).WithArgs("1000", blockTS, txHash).WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth"); err != nil { + t.Fatalf("insertEvent returned error on existing row: %v", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestInsertEvent_InsertsWhenRowDoesNotExist verifies that insertEvent still +// inserts fresh rows. The idempotency check must not break the base case. +func TestInsertEvent_InsertsWhenRowDoesNotExist(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnRows( + sqlmock.NewRows([]string{"exists"}).AddRow(false), + ) + mock.ExpectExec(regexp.QuoteMeta("INSERT INTO mevcommit_57173.fastswap_miles")). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth"); err != nil { + t.Fatalf("insertEvent returned error on new row: %v", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// TestInsertEvent_PropagatesExistenceCheckError verifies that a DB error on +// the SELECT EXISTS returns an error rather than falling through to INSERT — +// failing closed preserves the idempotency guarantee under DB trouble. +func TestInsertEvent_PropagatesExistenceCheckError(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer func() { _ = db.Close() }() + + txHash := "0xdead" + blockTS := time.Date(2026, 4, 17, 12, 0, 0, 0, time.UTC) + + mock.ExpectQuery(regexp.QuoteMeta( + "SELECT EXISTS(SELECT 1 FROM mevcommit_57173.fastswap_miles WHERE tx_hash = ?)", + )).WithArgs(txHash).WillReturnError(errForceTest) + // No INSERT expected. + + err = insertEvent(db, txHash, 12345, &blockTS, newTestEvent(), big.NewInt(1000), "eth_weth") + if err == nil { + t.Fatalf("expected error from existence check, got nil") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("sqlmock expectations: %v", err) + } +} + +// errForceTest is a sentinel used to force an error path in tests. +var errForceTest = sqlmockErr("forced test error") + +type sqlmockErr string + +func (e sqlmockErr) Error() string { return string(e) }