From 3e0704d7d8d239aba6c036b2ce96efd30edcf932 Mon Sep 17 00:00:00 2001 From: Alok Date: Wed, 10 Sep 2025 18:57:29 +0530 Subject: [PATCH 1/6] feat: add slack notifications to RPC --- tools/preconf-rpc/notifier/notifier.go | 213 ++++++++++++++++++++ tools/preconf-rpc/notifier/notifier_test.go | 95 +++++++++ tools/preconf-rpc/service/service.go | 194 ++++++++++++++---- 3 files changed, 464 insertions(+), 38 deletions(-) create mode 100644 tools/preconf-rpc/notifier/notifier.go create mode 100644 tools/preconf-rpc/notifier/notifier_test.go diff --git a/tools/preconf-rpc/notifier/notifier.go b/tools/preconf-rpc/notifier/notifier.go new file mode 100644 index 000000000..c6adc2b0c --- /dev/null +++ b/tools/preconf-rpc/notifier/notifier.go @@ -0,0 +1,213 @@ +package notifier + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "math/big" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" +) + +// Message represents a notification message structure +// generalized to support different platforms. +type Message struct { + Text string `json:"text,omitempty"` + Attachments []Attachment `json:"attachments,omitempty"` +} + +// Attachment represents a message attachment +type Attachment struct { + Color string `json:"color,omitempty"` + Title string `json:"title,omitempty"` + Text string `json:"text,omitempty"` + Fields []Field `json:"fields,omitempty"` + Footer string `json:"footer,omitempty"` + TS int64 `json:"ts,omitempty"` + MarkdownIn []string `json:"mrkdwn_in,omitempty"` +} + +// Field represents a field in a message attachment +type Field struct { + Title string `json:"title"` + Value string `json:"value"` + Short bool `json:"short"` +} + +// Notifier sends notifications to multiple webhook endpoints +type Notifier struct { + webhookURLs []string + client *http.Client + logger *slog.Logger + enabled bool +} + +// NewNotifier creates a new notifier instance +func NewNotifier(webhookURLs []string, logger *slog.Logger) *Notifier { + enabled := len(webhookURLs) > 0 + + if !enabled { + logger.Warn("Notifications disabled - no webhook URLs provided") + } else { + logger.Info("Notifications enabled") + } + + return &Notifier{ + webhookURLs: webhookURLs, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + enabled: enabled, + } +} + +// SendMessage sends a message to all configured webhook endpoints +func (n *Notifier) SendMessage(ctx context.Context, message Message) error { + if !n.enabled { + n.logger.Debug("Notification skipped (disabled)") + return nil + } + + messageJSON, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + var errs []error + for _, webhookURL := range n.webhookURLs { + req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, bytes.NewBuffer(messageJSON)) + if err != nil { + n.logger.Error("Failed to create request", "webhook", webhookURL, "error", err) + errs = append(errs, fmt.Errorf("create request (%s): %w", webhookURL, err)) + continue + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := n.client.Do(req) + if err != nil { + n.logger.Error("Failed to send notification", "webhook", webhookURL, "error", err) + errs = append(errs, fmt.Errorf("send notification (%s): %w", webhookURL, err)) + continue + } + + if _, err := io.Copy(io.Discard, resp.Body); err != nil { + n.logger.Error("Failed to read response body", "webhook", webhookURL, "error", err) + errs = append(errs, fmt.Errorf("read response body (%s): %w", webhookURL, err)) + } + + if err := resp.Body.Close(); err != nil { + n.logger.Error("Failed to close response body", "webhook", webhookURL, "error", err) + errs = append(errs, fmt.Errorf("close response body (%s): %w", webhookURL, err)) + } + + if resp.StatusCode != http.StatusOK { + n.logger.Error("Notification API returned non-200 status code", "webhook", webhookURL, "status", resp.StatusCode) + errs = append(errs, fmt.Errorf("non-200 status (%s): %d", webhookURL, resp.StatusCode)) + continue + } + n.logger.Debug("Notification sent successfully", "webhook", webhookURL) + } + + if len(errs) > 0 { + return fmt.Errorf("one or more notifications failed: %w", errors.Join(errs...)) + } + return nil +} + +type BalanceGetter interface { + BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) +} + +func (n *Notifier) SetupLowBalanceNotification( + ctx context.Context, + chainDesc string, + getter BalanceGetter, + account common.Address, + msg string, + thresholdEth float64, + checkInterval time.Duration, + alertCooldown time.Duration, +) <-chan struct{} { + ticker := time.NewTicker(checkInterval) + done := make(chan struct{}) + go func() { + defer close(done) + defer ticker.Stop() + var lastAlert time.Time + + for { + select { + case <-ctx.Done(): + n.logger.Info("Low balance notification routine stopping due to context cancellation") + return + case <-ticker.C: + if time.Since(lastAlert) < alertCooldown { + continue + } + + balance, err := getter.BalanceAt(ctx, account, nil) + if err != nil { + n.logger.Error("Failed to get account balance", "account", account, "error", err) + continue + } + balanceEth := new(big.Float).Quo(new(big.Float).SetInt(balance), big.NewFloat(params.Ether)) + balanceEthFloat, _ := balanceEth.Float64() + if balanceEthFloat < thresholdEth { + message := Message{ + Text: "⚠️ Low Balance Alert", + Attachments: []Attachment{ + { + Color: "#ff0000", + Title: fmt.Sprintf("The following accounts have low balances on %s chain:", chainDesc), + Text: fmt.Sprintf("%s\n\nAccount: %s\nBalance: %s (Threshold: %.4f ETH)", msg, account, formatWeiToEth(balance), thresholdEth), + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + if err := n.SendMessage(ctx, message); err != nil { + n.logger.Error("Failed to send low balance notification", "error", err) + } else { + lastAlert = time.Now() + } + } + } + } + }() + return done +} + +func (n *Notifier) SendBidderFundedNotification( + ctx context.Context, + chainDesc string, + account common.Address, + amountWei *big.Int, +) error { + message := Message{ + Text: "🎉 Bidder Funded Alert", + Attachments: []Attachment{ + { + Color: "#36a64f", + Title: fmt.Sprintf("A bidder account has been funded on %s chain:", chainDesc), + Text: fmt.Sprintf("Account: %s\nAmount: %s", account, formatWeiToEth(amountWei)), + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + return n.SendMessage(ctx, message) +} + +func formatWeiToEth(wei *big.Int) string { + ethValue := new(big.Float).Quo(new(big.Float).SetInt(wei), new(big.Float).SetFloat64(params.Ether)) + return fmt.Sprintf("%.6f ETH", ethValue) +} diff --git a/tools/preconf-rpc/notifier/notifier_test.go b/tools/preconf-rpc/notifier/notifier_test.go new file mode 100644 index 000000000..1f6c8d000 --- /dev/null +++ b/tools/preconf-rpc/notifier/notifier_test.go @@ -0,0 +1,95 @@ +package notifier + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "math/big" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" +) + +func TestFormatRelayList(t *testing.T) { + if got := formatRelayList([]string{}); got != "None" { + t.Errorf("formatRelayList(empty) = %q; want \"None\"", got) + } + relays := []string{"relay1", "relay2"} + want := "- relay1\n- relay2\n" + if got := formatRelayList(relays); got != want { + t.Errorf("formatRelayList(%v) = %q; want %q", relays, got, want) + } +} + +func TestFormatWeiToEth(t *testing.T) { + cases := []struct { + wei *big.Int + want string + }{ + {big.NewInt(0), "0.000000 ETH"}, + {big.NewInt(params.Ether), "1.000000 ETH"}, + {big.NewInt(1234567890000000000), "1.234568 ETH"}, + } + for _, c := range cases { + if got := formatWeiToEth(c.wei); got != c.want { + t.Errorf("formatWeiToEth(%v) = %q; want %q", c.wei, got, c.want) + } + } +} + +func TestNewNotifier(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{}, logger) + if n.enabled { + t.Errorf("expected notifier disabled when webhookURLs are empty") + } + n2 := NewNotifier([]string{"http://example.com"}, logger) + if !n2.enabled { + t.Errorf("expected notifier enabled when webhookURLs provided") + } +} + +func TestSendMessage_Disabled(t *testing.T) { + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{}, logger) + if err := n.SendMessage(context.Background(), Message{Text: "hello"}); err != nil { + t.Errorf("SendMessage disabled = error %v; want nil", err) + } +} + +func TestSendMessage_Success(t *testing.T) { + var received []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + received = body + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{server.URL}, logger) + msg := Message{Text: "test"} + require.NoError(t, n.SendMessage(context.Background(), msg)) + + var got Message + require.NoError(t, json.Unmarshal(received, &got)) + require.Equal(t, msg.Text, got.Text) +} + +func TestSendMessage_NonOK(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{server.URL}, logger) + err := n.SendMessage(context.Background(), Message{Text: "err"}) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "one or more notifications failed")) +} diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index cbb189595..44737f2d5 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -10,6 +10,7 @@ import ( "log/slog" "math/big" "net/http" + "slices" "time" "github.com/ethereum/go-ethereum/common" @@ -20,6 +21,7 @@ import ( notificationsapiv1 "github.com/primev/mev-commit/p2p/gen/go/notificationsapi/v1" "github.com/primev/mev-commit/tools/preconf-rpc/blocktracker" "github.com/primev/mev-commit/tools/preconf-rpc/handlers" + "github.com/primev/mev-commit/tools/preconf-rpc/notifier" "github.com/primev/mev-commit/tools/preconf-rpc/pricer" "github.com/primev/mev-commit/tools/preconf-rpc/rpcserver" "github.com/primev/mev-commit/tools/preconf-rpc/sender" @@ -57,6 +59,7 @@ type Config struct { GasTipCap *big.Int GasFeeCap *big.Int PricerAPIKey string + Webhooks []string } type Service struct { @@ -106,46 +109,11 @@ func New(config *Config) (*Service, error) { topologyCli := debugapiv1.NewDebugServiceClient(conn) notificationsCli := notificationsapiv1.NewNotificationsClient(conn) - status, err := bidderCli.DepositManagerStatus(context.Background(), &bidderapiv1.DepositManagerStatusRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get deposit manager status: %w", err) - } - if !status.Enabled { - resp, err := bidderCli.EnableDepositManager(context.Background(), &bidderapiv1.EnableDepositManagerRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to enable deposit manager: %w", err) - } - if !resp.Success { - return nil, fmt.Errorf("failed to enable deposit manager") - } - } - config.Logger.Info("deposit manager enabled") - - validProviders, err := bidderCli.GetValidProviders(context.Background(), &bidderapiv1.GetValidProvidersRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get valid providers: %w", err) - } - if len(validProviders.ValidProviders) == 0 { - return nil, fmt.Errorf("no valid providers found") + if err := setupDeposits(bidderCli, config.AutoDepositAmount); err != nil { + return nil, fmt.Errorf("failed to setup deposits: %w", err) } - targetDeposits := make([]*bidderapiv1.TargetDeposit, len(validProviders.ValidProviders)) - for i, provider := range validProviders.ValidProviders { - targetDeposits[i] = &bidderapiv1.TargetDeposit{ - Provider: provider, - TargetDeposit: config.AutoDepositAmount.String(), - } - } - - resp, err := bidderCli.SetTargetDeposits(context.Background(), &bidderapiv1.SetTargetDepositsRequest{ - TargetDeposits: targetDeposits, - }) - if err != nil { - return nil, fmt.Errorf("failed to set target deposits: %w", err) - } - if len(resp.SuccessfullySetDeposits) != len(targetDeposits) { - return nil, fmt.Errorf("failed to set target deposits") - } + notifier := notifier.NewNotifier(config.Webhooks, config.Logger.With("module", "notifier")) bridgeConfig := transfer.BridgeConfig{ Signer: config.Signer, @@ -185,6 +153,41 @@ func New(config *Config) (*Service, error) { healthChecker := health.New() + balanceNotifierDone := notifier.SetupLowBalanceNotification( + ctx, + "L1 Ethereum", + l1RPCClient.RawClient(), + config.Signer.GetAddress(), + "RPC Operator account balance low", + 3.0, + 5*time.Minute, + 15*time.Minute, + ) + + healthChecker.Register(health.CloseChannelHealthCheck("BalanceNotifier", balanceNotifierDone)) + s.closers = append(s.closers, channelCloser(balanceNotifierDone)) + + bidderEOA, err := getBidderEOA(topologyCli) + if err != nil { + return nil, fmt.Errorf("failed to get bidder EOA: %w", err) + } + + bidderFunderDone := startBidderFunder( + ctx, + config.Logger.With("module", "bidderfunder"), + bidderEOA, + accountsync.NewAccountSync(bidderEOA, settlementClient), + transferer, + config.SettlementThreshold, + config.SettlementTopup, + settlementClient, + settlementChainID, + notifier, + ) + + healthChecker.Register(health.CloseChannelHealthCheck("BidderFunder", bidderFunderDone)) + s.closers = append(s.closers, channelCloser(bidderFunderDone)) + bridgerDone := bridger.Start(ctx) healthChecker.Register(health.CloseChannelHealthCheck("Bridger", bridgerDone)) s.closers = append(s.closers, channelCloser(bridgerDone)) @@ -332,3 +335,118 @@ func initDB(opts *Config) (db *sql.DB, err error) { return db, err } + +func startBidderFunder( + ctx context.Context, + logger *slog.Logger, + bidderAccount common.Address, + syncer *accountsync.AccountSync, + transferer *transfer.Transferer, + settlementThreshold *big.Int, + settlementTopup *big.Int, + settlementClient *ethclient.Client, + settlementChainID *big.Int, + notifier *notifier.Notifier, +) <-chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + + for { + sub := syncer.Subscribe(ctx, settlementThreshold) + select { + case <-ctx.Done(): + return + case <-sub: + logger.Info("bidder account balance below threshold") + err := transferer.Transfer(ctx, bidderAccount, settlementChainID, settlementTopup) + if err != nil { + logger.Error("failed to transfer funds to bidder account", "error", err) + } else { + logger.Info("successfully transferred funds to bidder account") + if err := notifier.SendBidderFundedNotification( + ctx, + "mev-commit chain", + bidderAccount, + settlementTopup, + ); err != nil { + logger.Error("failed to send bidder funded notification", "error", err) + } + } + time.Sleep(1 * time.Minute) // Prevent rapid retries + } + } + }() + + return done +} + +func setupDeposits(bidderCli bidderapiv1.BidderClient, amount *big.Int) error { + status, err := bidderCli.DepositManagerStatus(context.Background(), &bidderapiv1.DepositManagerStatusRequest{}) + if err != nil { + return fmt.Errorf("failed to get deposit manager status: %w", err) + } + if !status.Enabled { + resp, err := bidderCli.EnableDepositManager(context.Background(), &bidderapiv1.EnableDepositManagerRequest{}) + if err != nil { + return fmt.Errorf("failed to enable deposit manager: %w", err) + } + if !resp.Success { + return fmt.Errorf("failed to enable deposit manager") + } + } + + validProviders, err := bidderCli.GetValidProviders(context.Background(), &bidderapiv1.GetValidProvidersRequest{}) + if err != nil { + return fmt.Errorf("failed to get valid providers: %w", err) + } + if len(validProviders.ValidProviders) == 0 { + return fmt.Errorf("no valid providers found") + } + + targetDeposits := make([]*bidderapiv1.TargetDeposit, 0, len(validProviders.ValidProviders)) + for _, provider := range validProviders.ValidProviders { + if status.Enabled && slices.ContainsFunc(status.TargetDeposits, func(td *bidderapiv1.TargetDeposit) bool { + if td.Provider == provider && td.TargetDeposit == amount.String() { + return true + } + return false + }) { + continue + } + targetDeposits = append(targetDeposits, &bidderapiv1.TargetDeposit{ + Provider: provider, + TargetDeposit: amount.String(), + }) + } + + if len(targetDeposits) > 0 { + resp, err := bidderCli.SetTargetDeposits(context.Background(), &bidderapiv1.SetTargetDepositsRequest{ + TargetDeposits: targetDeposits, + }) + if err != nil { + return fmt.Errorf("failed to set target deposits: %w", err) + } + if len(resp.SuccessfullySetDeposits) != len(targetDeposits) { + return fmt.Errorf("failed to set target deposits") + } + } + + return nil +} + +func getBidderEOA(debugClient debugapiv1.DebugServiceClient) (common.Address, error) { + info, err := debugClient.GetTopology(context.Background(), &debugapiv1.EmptyMessage{}) + if err != nil { + return common.Address{}, fmt.Errorf("failed to get node info: %w", err) + } + self := info.Topology.Fields["self"].GetStructValue() + if self == nil { + return common.Address{}, fmt.Errorf("self field not found in topology") + } + addressHex := self.Fields["Ethereum Address"].GetStringValue() + if addressHex == "" { + return common.Address{}, fmt.Errorf("ethereum address not found in topology self field") + } + return common.HexToAddress(addressHex), nil +} From 4224125514f8f8b5fafc56ab7e0ce4adf414788b Mon Sep 17 00:00:00 2001 From: Alok Date: Thu, 11 Sep 2025 19:05:00 +0530 Subject: [PATCH 2/6] feat: add notifications for RPC --- tools/preconf-rpc/notifier/notifier.go | 182 ++++++++++++++------ tools/preconf-rpc/notifier/notifier_test.go | 33 ---- tools/preconf-rpc/sender/sender.go | 23 ++- tools/preconf-rpc/sender/sender_test.go | 15 ++ tools/preconf-rpc/service/service.go | 9 +- 5 files changed, 169 insertions(+), 93 deletions(-) diff --git a/tools/preconf-rpc/notifier/notifier.go b/tools/preconf-rpc/notifier/notifier.go index c6adc2b0c..8dd9bca5e 100644 --- a/tools/preconf-rpc/notifier/notifier.go +++ b/tools/preconf-rpc/notifier/notifier.go @@ -10,10 +10,12 @@ import ( "log/slog" "math/big" "net/http" + "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" + "github.com/primev/mev-commit/tools/preconf-rpc/sender" ) // Message represents a notification message structure @@ -41,86 +43,69 @@ type Field struct { Short bool `json:"short"` } +type txnInfo struct { + txn *sender.Transaction + noOfAttempts int + start time.Time +} + // Notifier sends notifications to multiple webhook endpoints type Notifier struct { webhookURLs []string client *http.Client logger *slog.Logger - enabled bool + queuedTxns []txnInfo + queuedMu sync.Mutex } // NewNotifier creates a new notifier instance func NewNotifier(webhookURLs []string, logger *slog.Logger) *Notifier { - enabled := len(webhookURLs) > 0 - - if !enabled { - logger.Warn("Notifications disabled - no webhook URLs provided") - } else { - logger.Info("Notifications enabled") - } - return &Notifier{ webhookURLs: webhookURLs, client: &http.Client{ Timeout: 10 * time.Second, }, - logger: logger, - enabled: enabled, + logger: logger, } } // SendMessage sends a message to all configured webhook endpoints func (n *Notifier) SendMessage(ctx context.Context, message Message) error { - if !n.enabled { - n.logger.Debug("Notification skipped (disabled)") - return nil - } - messageJSON, err := json.Marshal(message) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } - var errs []error - for _, webhookURL := range n.webhookURLs { - req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, bytes.NewBuffer(messageJSON)) + var retErr error + for _, url := range n.webhookURLs { + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(messageJSON)) if err != nil { - n.logger.Error("Failed to create request", "webhook", webhookURL, "error", err) - errs = append(errs, fmt.Errorf("create request (%s): %w", webhookURL, err)) + n.logger.Error("Failed to create HTTP request", "url", url, "error", err) + retErr = errors.Join(retErr, err) continue } - req.Header.Set("Content-Type", "application/json") resp, err := n.client.Do(req) if err != nil { - n.logger.Error("Failed to send notification", "webhook", webhookURL, "error", err) - errs = append(errs, fmt.Errorf("send notification (%s): %w", webhookURL, err)) + n.logger.Error("Failed to send notification", "url", url, "error", err) + retErr = errors.Join(retErr, err) continue } + defer resp.Body.Close() - if _, err := io.Copy(io.Discard, resp.Body); err != nil { - n.logger.Error("Failed to read response body", "webhook", webhookURL, "error", err) - errs = append(errs, fmt.Errorf("read response body (%s): %w", webhookURL, err)) - } - - if err := resp.Body.Close(); err != nil { - n.logger.Error("Failed to close response body", "webhook", webhookURL, "error", err) - errs = append(errs, fmt.Errorf("close response body (%s): %w", webhookURL, err)) - } - - if resp.StatusCode != http.StatusOK { - n.logger.Error("Notification API returned non-200 status code", "webhook", webhookURL, "status", resp.StatusCode) - errs = append(errs, fmt.Errorf("non-200 status (%s): %d", webhookURL, resp.StatusCode)) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + errMsg := fmt.Sprintf("non-2xx response: %d - %s", resp.StatusCode, string(body)) + n.logger.Error("Failed to send notification", "url", url, "error", errMsg) + retErr = errors.Join(retErr, errors.New(errMsg)) continue } - n.logger.Debug("Notification sent successfully", "webhook", webhookURL) - } - if len(errs) > 0 { - return fmt.Errorf("one or more notifications failed: %w", errors.Join(errs...)) + n.logger.Info("Notification sent successfully", "url", url) } - return nil + + return retErr } type BalanceGetter interface { @@ -129,10 +114,9 @@ type BalanceGetter interface { func (n *Notifier) SetupLowBalanceNotification( ctx context.Context, - chainDesc string, + desc string, getter BalanceGetter, account common.Address, - msg string, thresholdEth float64, checkInterval time.Duration, alertCooldown time.Duration, @@ -163,12 +147,12 @@ func (n *Notifier) SetupLowBalanceNotification( balanceEthFloat, _ := balanceEth.Float64() if balanceEthFloat < thresholdEth { message := Message{ - Text: "⚠️ Low Balance Alert", + Text: "🏦 Low Balance Alert", Attachments: []Attachment{ { Color: "#ff0000", - Title: fmt.Sprintf("The following accounts have low balances on %s chain:", chainDesc), - Text: fmt.Sprintf("%s\n\nAccount: %s\nBalance: %s (Threshold: %.4f ETH)", msg, account, formatWeiToEth(balance), thresholdEth), + Title: desc, + Text: fmt.Sprintf("Account: %s\nBalance: %s (Threshold: %.4f ETH)", account, formatWeiToEth(balance), thresholdEth), Footer: "Preconf RPC Monitor", TS: time.Now().Unix(), }, @@ -188,16 +172,15 @@ func (n *Notifier) SetupLowBalanceNotification( func (n *Notifier) SendBidderFundedNotification( ctx context.Context, - chainDesc string, account common.Address, amountWei *big.Int, ) error { message := Message{ - Text: "🎉 Bidder Funded Alert", + Text: "💵 Bidder Funded", Attachments: []Attachment{ { Color: "#36a64f", - Title: fmt.Sprintf("A bidder account has been funded on %s chain:", chainDesc), + Title: "Bidder account was funded", Text: fmt.Sprintf("Account: %s\nAmount: %s", account, formatWeiToEth(amountWei)), Footer: "Preconf RPC Monitor", TS: time.Now().Unix(), @@ -207,6 +190,105 @@ func (n *Notifier) SendBidderFundedNotification( return n.SendMessage(ctx, message) } +func (n *Notifier) StartTransactionNotifier( + ctx context.Context, +) <-chan struct{} { + done := make(chan struct{}) + ticker := time.NewTicker(15 * time.Minute) + + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + n.logger.Info("Transaction notification routine stopping due to context cancellation") + return + case <-ticker.C: + n.queuedMu.Lock() + if len(n.queuedTxns) == 0 { + n.logger.Debug("No queued transactions to notify about") + n.queuedMu.Unlock() + continue + } + txnsToNotify := n.queuedTxns + n.queuedTxns = nil + n.queuedMu.Unlock() + // create markdown table with the txn info + text := "" + for _, txnInfo := range txnsToNotify { + duration := time.Since(txnInfo.start).Round(time.Second) + status := "" + switch txnInfo.txn.Status { + case sender.TxStatusPreConfirmed: + status = "⚡ Pre-Confirmed" + case sender.TxStatusConfirmed: + status = "✅ Confirmed" + case sender.TxStatusFailed: + status = "❌ Failed" + status = fmt.Sprintf("%s (Error: %s)", status, txnInfo.txn.Details) + default: + status = "❓ Unknown" + } + txType := "" + switch txnInfo.txn.Type { + case sender.TxTypeRegular: + txType = "💸 ETH Transaction" + case sender.TxTypeDeposit: + txType = "🏦 Deposit" + case sender.TxTypeInstantBridge: + txType = "🌉 Instant Bridge" + default: + txType = "❓ Unknown" + } + text += fmt.Sprintf( + "- Txn: %s | Sender: %s | Attempts: %d | Duration: %s | Type: %s | Status: %s\n", + txnInfo.txn.Hash().Hex()[:8], + txnInfo.txn.Sender.Hex()[:8], + txnInfo.noOfAttempts, + duration, + txType, + status, + ) + } + message := Message{ + Text: "🚀 Transaction Report", + Attachments: []Attachment{ + { + Color: "#FFA500", + Title: "The following transactions were completed in the last 15 mins", + Text: text, + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + if err := n.SendMessage(ctx, message); err != nil { + n.logger.Error("Failed to send 15 minute transaction notification", "error", err) + } + } + } + }() + + return done +} + +func (n *Notifier) NotifyTransactionStatus( + txn *sender.Transaction, + noOfAttempts int, + start time.Time, +) { + n.queuedMu.Lock() + defer n.queuedMu.Unlock() + + n.queuedTxns = append(n.queuedTxns, txnInfo{ + txn: txn, + noOfAttempts: noOfAttempts, + start: start, + }) +} + func formatWeiToEth(wei *big.Int) string { ethValue := new(big.Float).Quo(new(big.Float).SetInt(wei), new(big.Float).SetFloat64(params.Ether)) return fmt.Sprintf("%.6f ETH", ethValue) diff --git a/tools/preconf-rpc/notifier/notifier_test.go b/tools/preconf-rpc/notifier/notifier_test.go index 1f6c8d000..06afde626 100644 --- a/tools/preconf-rpc/notifier/notifier_test.go +++ b/tools/preconf-rpc/notifier/notifier_test.go @@ -8,24 +8,12 @@ import ( "math/big" "net/http" "net/http/httptest" - "strings" "testing" "github.com/ethereum/go-ethereum/params" "github.com/stretchr/testify/require" ) -func TestFormatRelayList(t *testing.T) { - if got := formatRelayList([]string{}); got != "None" { - t.Errorf("formatRelayList(empty) = %q; want \"None\"", got) - } - relays := []string{"relay1", "relay2"} - want := "- relay1\n- relay2\n" - if got := formatRelayList(relays); got != want { - t.Errorf("formatRelayList(%v) = %q; want %q", relays, got, want) - } -} - func TestFormatWeiToEth(t *testing.T) { cases := []struct { wei *big.Int @@ -42,26 +30,6 @@ func TestFormatWeiToEth(t *testing.T) { } } -func TestNewNotifier(t *testing.T) { - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - n := NewNotifier([]string{}, logger) - if n.enabled { - t.Errorf("expected notifier disabled when webhookURLs are empty") - } - n2 := NewNotifier([]string{"http://example.com"}, logger) - if !n2.enabled { - t.Errorf("expected notifier enabled when webhookURLs provided") - } -} - -func TestSendMessage_Disabled(t *testing.T) { - logger := slog.New(slog.NewTextHandler(io.Discard, nil)) - n := NewNotifier([]string{}, logger) - if err := n.SendMessage(context.Background(), Message{Text: "hello"}); err != nil { - t.Errorf("SendMessage disabled = error %v; want nil", err) - } -} - func TestSendMessage_Success(t *testing.T) { var received []byte server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -91,5 +59,4 @@ func TestSendMessage_NonOK(t *testing.T) { n := NewNotifier([]string{server.URL}, logger) err := n.SendMessage(context.Background(), Message{Text: "err"}) require.Error(t, err) - require.True(t, strings.Contains(err.Error(), "one or more notifications failed")) } diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 992dca739..6fda65c8e 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -112,6 +112,10 @@ type txnAttempt struct { attempts []*blockAttempt } +type Notifier interface { + NotifyTransactionStatus(txn *Transaction, noOfAttempts int, start time.Time) +} + type TxSender struct { logger *slog.Logger store Store @@ -129,6 +133,7 @@ type TxSender struct { inflightMu sync.RWMutex processMu sync.RWMutex txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt] + notifier Notifier } func NewTxSender( @@ -137,6 +142,7 @@ func NewTxSender( pricer Pricer, blockTracker BlockTracker, transferer Transferer, + notifier Notifier, settlementChainId *big.Int, logger *slog.Logger, ) (*TxSender, error) { @@ -159,6 +165,7 @@ func NewTxSender( inflightTxns: make(map[common.Hash]chan struct{}), inflightAccount: make(map[common.Address]struct{}), txnAttemptHistory: txnAttemptHistory, + notifier: notifier, }, nil } @@ -380,7 +387,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err) txn.Status = TxStatusFailed txn.Details = err.Error() - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) return t.store.StoreTransaction(ctx, txn, nil) } return nil @@ -438,7 +445,7 @@ BID_LOOP: "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) break BID_LOOP } default: @@ -478,7 +485,7 @@ BID_LOOP: "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) break BID_LOOP } } @@ -750,8 +757,8 @@ func (t *TxSender) calculatePriceForNextBlock( ) } -func (t *TxSender) clearBlockAttemptHistory(txnHash common.Hash) { - attempts, found := t.txnAttemptHistory.Get(txnHash) +func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) { + attempts, found := t.txnAttemptHistory.Get(txn.Hash()) if !found { return } @@ -763,12 +770,14 @@ func (t *TxSender) clearBlockAttemptHistory(txnHash common.Hash) { t.logger.Info( "Clearing block attempt history for transaction", - "hash", txnHash.Hex(), + "hash", txn.Hash().Hex(), "blockAttempts", len(attempts.attempts), "startTime", attempts.startTime.Format(time.RFC3339), "startBlockNumber", attempts.attempts[0].blockNumber, "totalAttempts", totalAttempts, ) - _ = t.txnAttemptHistory.Remove(txnHash) + _ = t.txnAttemptHistory.Remove(txn.Hash()) + + t.notifier.NotifyTransactionStatus(txn, totalAttempts, attempts.startTime) } diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index 04da270ca..3bd8f90b2 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -257,6 +257,14 @@ func (m *mockTransferer) Transfer(ctx context.Context, to common.Address, chainI return nil } +type mockNotifier struct { + notifications []string +} + +func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Time) { + m.notifications = append(m.notifications, txn.Hash().Hex()) +} + func TestSender(t *testing.T) { t.Parallel() @@ -276,6 +284,7 @@ func TestSender(t *testing.T) { bnOut: make(chan blockNoOp, 10), bnErr: make(chan error, 1), } + notifier := &mockNotifier{} sndr, err := sender.NewTxSender( st, @@ -283,6 +292,7 @@ func TestSender(t *testing.T) { testPricer, blockTracker, &mockTransferer{}, + notifier, big.NewInt(1), // Settlement chain ID util.NewTestLogger(os.Stdout), ) @@ -518,6 +528,10 @@ func TestSender(t *testing.T) { cancel() <-done + + if len(notifier.notifications) != 2 { + t.Fatalf("expected 2 notifications, got %d", len(notifier.notifications)) + } } func TestCancelTransaction(t *testing.T) { @@ -546,6 +560,7 @@ func TestCancelTransaction(t *testing.T) { testPricer, blockTracker, &mockTransferer{}, + &mockNotifier{}, big.NewInt(1), // Settlement chain ID util.NewTestLogger(os.Stdout), ) diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index 44737f2d5..133bf9a02 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -155,10 +155,9 @@ func New(config *Config) (*Service, error) { balanceNotifierDone := notifier.SetupLowBalanceNotification( ctx, - "L1 Ethereum", + "RPC Operator AccountBalance Low", l1RPCClient.RawClient(), config.Signer.GetAddress(), - "RPC Operator account balance low", 3.0, 5*time.Minute, 15*time.Minute, @@ -185,6 +184,10 @@ func New(config *Config) (*Service, error) { notifier, ) + txnNotifierDone := notifier.StartTransactionNotifier(ctx) + healthChecker.Register(health.CloseChannelHealthCheck("TransactionNotifier", txnNotifierDone)) + s.closers = append(s.closers, channelCloser(txnNotifierDone)) + healthChecker.Register(health.CloseChannelHealthCheck("BidderFunder", bidderFunderDone)) s.closers = append(s.closers, channelCloser(bidderFunderDone)) @@ -233,6 +236,7 @@ func New(config *Config) (*Service, error) { bidpricer, blockTracker, transferer, + notifier, settlementChainID, config.Logger.With("module", "txsender"), ) @@ -366,7 +370,6 @@ func startBidderFunder( logger.Info("successfully transferred funds to bidder account") if err := notifier.SendBidderFundedNotification( ctx, - "mev-commit chain", bidderAccount, settlementTopup, ); err != nil { From f6f7867e5db63d5e86d83bf7164d2289b889b228 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Thu, 11 Sep 2025 23:48:53 +0530 Subject: [PATCH 3/6] fix: lint --- tools/preconf-rpc/notifier/notifier.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tools/preconf-rpc/notifier/notifier.go b/tools/preconf-rpc/notifier/notifier.go index 8dd9bca5e..baf9cecee 100644 --- a/tools/preconf-rpc/notifier/notifier.go +++ b/tools/preconf-rpc/notifier/notifier.go @@ -92,7 +92,11 @@ func (n *Notifier) SendMessage(ctx context.Context, message Message) error { retErr = errors.Join(retErr, err) continue } - defer resp.Body.Close() + func() { + // Ensure the body is fully read and closed to allow connection reuse + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) From d530a40fff07824114eb802fbe32698d29885cb3 Mon Sep 17 00:00:00 2001 From: Alok Date: Fri, 12 Sep 2025 16:25:25 +0530 Subject: [PATCH 4/6] feat: add notifications for RPC --- tools/preconf-rpc/main.go | 22 ++++++++++++++++++++++ tools/preconf-rpc/service/service.go | 3 ++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/tools/preconf-rpc/main.go b/tools/preconf-rpc/main.go index 90f378ba5..6e44e6c42 100644 --- a/tools/preconf-rpc/main.go +++ b/tools/preconf-rpc/main.go @@ -122,6 +122,13 @@ var ( Value: "10000000000000000000", // 10 ETH } + optionBidderTopup = &cli.StringFlag{ + Name: "bidder-topup", + Usage: "topup for bidder", + EnvVars: []string{"PRECONF_RPC_BIDDER_TOPUP"}, + Value: "1000000000000000000", // 1 ETH + } + optionAutoDepositAmount = &cli.StringFlag{ Name: "auto-deposit-amount", Usage: "auto deposit amount", @@ -183,6 +190,12 @@ var ( Value: "", } + optionWebhookURLs = &cli.StringSliceFlag{ + Name: "webhook-urls", + Usage: "List of webhook URLs to send notifications to", + EnvVars: []string{"PRECONF_RPC_WEBHOOK_URLS"}, + } + optionLogFmt = &cli.StringFlag{ Name: "log-fmt", Usage: "log format to use, options are 'text' or 'json'", @@ -254,6 +267,8 @@ func main() { optionDepositAddress, optionBridgeAddress, optionBlocknativeAPIKey, + optionWebhookURLs, + optionBidderTopup, }, Action: func(c *cli.Context) error { logger, err := util.NewLogger( @@ -291,6 +306,11 @@ func main() { return fmt.Errorf("failed to parse settlement-topup") } + bidderTopup, ok := new(big.Int).SetString(c.String(optionBidderTopup.Name), 10) + if !ok { + return fmt.Errorf("failed to parse bidder-topup") + } + signer, err := keysigner.NewKeystoreSigner( c.String(optionKeystorePath.Name), c.String(optionKeystorePassword.Name), @@ -316,6 +336,7 @@ func main() { AutoDepositAmount: autoDepositAmount, SettlementThreshold: settlementThreshold, SettlementTopup: settlementTopup, + BidderTopup: bidderTopup, SettlementRPCUrl: c.String(optionSettlementRPCUrl.Name), BidderRPC: c.String(optionBidderRPCUrl.Name), L1RPCUrls: c.StringSlice(optionL1RPCUrls.Name), @@ -325,6 +346,7 @@ func main() { DepositAddress: common.HexToAddress(c.String(optionDepositAddress.Name)), BridgeAddress: common.HexToAddress(c.String(optionBridgeAddress.Name)), PricerAPIKey: c.String(optionBlocknativeAPIKey.Name), + Webhooks: c.StringSlice(optionWebhookURLs.Name), } s, err := service.New(&config) diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index 133bf9a02..1aab2e1ed 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -55,6 +55,7 @@ type Config struct { BridgeAddress common.Address SettlementThreshold *big.Int SettlementTopup *big.Int + BidderTopup *big.Int HTTPPort int GasTipCap *big.Int GasFeeCap *big.Int @@ -178,7 +179,7 @@ func New(config *Config) (*Service, error) { accountsync.NewAccountSync(bidderEOA, settlementClient), transferer, config.SettlementThreshold, - config.SettlementTopup, + config.BidderTopup, settlementClient, settlementChainID, notifier, From 2deb2f4ec7a25460d2381f18eddbb6cdea3dd5c2 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Fri, 3 Oct 2025 00:45:11 +0530 Subject: [PATCH 5/6] fix: nits about config options --- tools/preconf-rpc/handlers/handlers.go | 11 +++++++++++ tools/preconf-rpc/main.go | 14 +++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/tools/preconf-rpc/handlers/handlers.go b/tools/preconf-rpc/handlers/handlers.go index 1363f68be..df6023ccb 100644 --- a/tools/preconf-rpc/handlers/handlers.go +++ b/tools/preconf-rpc/handlers/handlers.go @@ -16,6 +16,10 @@ import ( "github.com/primev/mev-commit/tools/preconf-rpc/sender" ) +const ( + bridgeLimitWei = 1000000000000000000 // 1 ETH +) + type Bidder interface { Estimate() (int64, error) } @@ -343,6 +347,13 @@ func (h *rpcMethodHandler) handleSendRawTx( txType = sender.TxTypeDeposit case txn.To().Cmp(h.bridgeAddress) == 0: txType = sender.TxTypeInstantBridge + if txn.Value().Cmp(big.NewInt(bridgeLimitWei)) > 0 { + h.logger.Error("Bridge transaction with value greater than limit", "value", txn.Value().String()) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + fmt.Sprintf("bridge transaction value exceeds limit %d wei", bridgeLimitWei), + ) + } } err = h.sndr.Enqueue(ctx, &sender.Transaction{ diff --git a/tools/preconf-rpc/main.go b/tools/preconf-rpc/main.go index 6e44e6c42..c785de431 100644 --- a/tools/preconf-rpc/main.go +++ b/tools/preconf-rpc/main.go @@ -112,28 +112,28 @@ var ( Name: "settlement-threshold", Usage: "Minimum threshold for settlement chain balance", EnvVars: []string{"PRECONF_RPC_SETTLEMENT_THRESHOLD"}, - Value: "5000000000000000000", // 5 ETH + Value: "2000000000000000000", // 2 ETH } optionSettlementTopup = &cli.StringFlag{ Name: "settlement-topup", Usage: "topup for settlement", EnvVars: []string{"PRECONF_RPC_SETTLEMENT_TOPUP"}, - Value: "10000000000000000000", // 10 ETH + Value: "2100000000000000000", // 2.1 ETH } optionBidderTopup = &cli.StringFlag{ Name: "bidder-topup", Usage: "topup for bidder", EnvVars: []string{"PRECONF_RPC_BIDDER_TOPUP"}, - Value: "1000000000000000000", // 1 ETH + Value: "100000000000000000", // 0.1 ETH } optionAutoDepositAmount = &cli.StringFlag{ - Name: "auto-deposit-amount", - Usage: "auto deposit amount", - EnvVars: []string{"PRECONF_RPC_AUTO_DEPOSIT_AMOUNT"}, - Value: "1000000000000000000", // 1 ETH + Name: "target-deposit-amount", + Usage: "target deposit amount", + EnvVars: []string{"PRECONF_RPC_TARGET_DEPOSIT_AMOUNT"}, + Value: "100000000000000000", // 0.1 ETH } optionGasTipCap = &cli.StringFlag{ From e36316ddb8af825c903f60690818f5ecb637225d Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Fri, 3 Oct 2025 01:03:24 +0530 Subject: [PATCH 6/6] fix: nits about config options --- tools/preconf-rpc/main.go | 20 ++++++++++++++------ tools/preconf-rpc/service/service.go | 4 ++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/tools/preconf-rpc/main.go b/tools/preconf-rpc/main.go index c785de431..f8adda4e8 100644 --- a/tools/preconf-rpc/main.go +++ b/tools/preconf-rpc/main.go @@ -122,14 +122,21 @@ var ( Value: "2100000000000000000", // 2.1 ETH } + optionBidderThreshold = &cli.StringFlag{ + Name: "bidder-threshold", + Usage: "threshold for bidder balance on settlement chain", + EnvVars: []string{"PRECONF_RPC_BIDDER_THRESHOLD"}, + Value: "100000000000000000", // 0.1 ETH + } + optionBidderTopup = &cli.StringFlag{ Name: "bidder-topup", Usage: "topup for bidder", EnvVars: []string{"PRECONF_RPC_BIDDER_TOPUP"}, - Value: "100000000000000000", // 0.1 ETH + Value: "110000000000000000", // 0.11 ETH } - optionAutoDepositAmount = &cli.StringFlag{ + optionTargetDepositAmount = &cli.StringFlag{ Name: "target-deposit-amount", Usage: "target deposit amount", EnvVars: []string{"PRECONF_RPC_TARGET_DEPOSIT_AMOUNT"}, @@ -263,11 +270,12 @@ func main() { optionGasTipCap, optionGasFeeCap, optionSettlementContractAddr, - optionAutoDepositAmount, + optionTargetDepositAmount, optionDepositAddress, optionBridgeAddress, optionBlocknativeAPIKey, optionWebhookURLs, + optionBidderThreshold, optionBidderTopup, }, Action: func(c *cli.Context) error { @@ -291,9 +299,9 @@ func main() { return fmt.Errorf("failed to parse gas-fee-cap") } - autoDepositAmount, ok := new(big.Int).SetString(c.String(optionAutoDepositAmount.Name), 10) + targetDepositAmount, ok := new(big.Int).SetString(c.String(optionTargetDepositAmount.Name), 10) if !ok { - return fmt.Errorf("failed to parse auto-deposit-amount") + return fmt.Errorf("failed to parse target-deposit-amount") } settlementThreshold, ok := new(big.Int).SetString(c.String(optionSettlementThreshold.Name), 10) @@ -333,7 +341,7 @@ func main() { Logger: logger, GasTipCap: gasTipCap, GasFeeCap: gasFeeCap, - AutoDepositAmount: autoDepositAmount, + TargetDepositAmount: targetDepositAmount, SettlementThreshold: settlementThreshold, SettlementTopup: settlementTopup, BidderTopup: bidderTopup, diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index 1aab2e1ed..9e2edb4b1 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -46,7 +46,7 @@ type Config struct { PgSSL bool Signer keysigner.KeySigner BidderRPC string - AutoDepositAmount *big.Int + TargetDepositAmount *big.Int L1RPCUrls []string SettlementRPCUrl string L1ContractAddr common.Address @@ -110,7 +110,7 @@ func New(config *Config) (*Service, error) { topologyCli := debugapiv1.NewDebugServiceClient(conn) notificationsCli := notificationsapiv1.NewNotificationsClient(conn) - if err := setupDeposits(bidderCli, config.AutoDepositAmount); err != nil { + if err := setupDeposits(bidderCli, config.TargetDepositAmount); err != nil { return nil, fmt.Errorf("failed to setup deposits: %w", err) }