diff --git a/tools/preconf-rpc/handlers/handlers.go b/tools/preconf-rpc/handlers/handlers.go index 1363f68be..df6023ccb 100644 --- a/tools/preconf-rpc/handlers/handlers.go +++ b/tools/preconf-rpc/handlers/handlers.go @@ -16,6 +16,10 @@ import ( "github.com/primev/mev-commit/tools/preconf-rpc/sender" ) +const ( + bridgeLimitWei = 1000000000000000000 // 1 ETH +) + type Bidder interface { Estimate() (int64, error) } @@ -343,6 +347,13 @@ func (h *rpcMethodHandler) handleSendRawTx( txType = sender.TxTypeDeposit case txn.To().Cmp(h.bridgeAddress) == 0: txType = sender.TxTypeInstantBridge + if txn.Value().Cmp(big.NewInt(bridgeLimitWei)) > 0 { + h.logger.Error("Bridge transaction with value greater than limit", "value", txn.Value().String()) + return nil, false, rpcserver.NewJSONErr( + rpcserver.CodeCustomError, + fmt.Sprintf("bridge transaction value exceeds limit %d wei", bridgeLimitWei), + ) + } } err = h.sndr.Enqueue(ctx, &sender.Transaction{ diff --git a/tools/preconf-rpc/main.go b/tools/preconf-rpc/main.go index 90f378ba5..f8adda4e8 100644 --- a/tools/preconf-rpc/main.go +++ b/tools/preconf-rpc/main.go @@ -112,21 +112,35 @@ var ( Name: "settlement-threshold", Usage: "Minimum threshold for settlement chain balance", EnvVars: []string{"PRECONF_RPC_SETTLEMENT_THRESHOLD"}, - Value: "5000000000000000000", // 5 ETH + Value: "2000000000000000000", // 2 ETH } optionSettlementTopup = &cli.StringFlag{ Name: "settlement-topup", Usage: "topup for settlement", EnvVars: []string{"PRECONF_RPC_SETTLEMENT_TOPUP"}, - Value: "10000000000000000000", // 10 ETH + Value: "2100000000000000000", // 2.1 ETH } - optionAutoDepositAmount = &cli.StringFlag{ - Name: "auto-deposit-amount", - Usage: "auto deposit amount", - EnvVars: []string{"PRECONF_RPC_AUTO_DEPOSIT_AMOUNT"}, - Value: "1000000000000000000", // 1 ETH + optionBidderThreshold = &cli.StringFlag{ + Name: "bidder-threshold", + Usage: "threshold for bidder balance on settlement chain", + EnvVars: []string{"PRECONF_RPC_BIDDER_THRESHOLD"}, + Value: "100000000000000000", // 0.1 ETH + } + + optionBidderTopup = &cli.StringFlag{ + Name: "bidder-topup", + Usage: "topup for bidder", + EnvVars: []string{"PRECONF_RPC_BIDDER_TOPUP"}, + Value: "110000000000000000", // 0.11 ETH + } + + optionTargetDepositAmount = &cli.StringFlag{ + Name: "target-deposit-amount", + Usage: "target deposit amount", + EnvVars: []string{"PRECONF_RPC_TARGET_DEPOSIT_AMOUNT"}, + Value: "100000000000000000", // 0.1 ETH } optionGasTipCap = &cli.StringFlag{ @@ -183,6 +197,12 @@ var ( Value: "", } + optionWebhookURLs = &cli.StringSliceFlag{ + Name: "webhook-urls", + Usage: "List of webhook URLs to send notifications to", + EnvVars: []string{"PRECONF_RPC_WEBHOOK_URLS"}, + } + optionLogFmt = &cli.StringFlag{ Name: "log-fmt", Usage: "log format to use, options are 'text' or 'json'", @@ -250,10 +270,13 @@ func main() { optionGasTipCap, optionGasFeeCap, optionSettlementContractAddr, - optionAutoDepositAmount, + optionTargetDepositAmount, optionDepositAddress, optionBridgeAddress, optionBlocknativeAPIKey, + optionWebhookURLs, + optionBidderThreshold, + optionBidderTopup, }, Action: func(c *cli.Context) error { logger, err := util.NewLogger( @@ -276,9 +299,9 @@ func main() { return fmt.Errorf("failed to parse gas-fee-cap") } - autoDepositAmount, ok := new(big.Int).SetString(c.String(optionAutoDepositAmount.Name), 10) + targetDepositAmount, ok := new(big.Int).SetString(c.String(optionTargetDepositAmount.Name), 10) if !ok { - return fmt.Errorf("failed to parse auto-deposit-amount") + return fmt.Errorf("failed to parse target-deposit-amount") } settlementThreshold, ok := new(big.Int).SetString(c.String(optionSettlementThreshold.Name), 10) @@ -291,6 +314,11 @@ func main() { return fmt.Errorf("failed to parse settlement-topup") } + bidderTopup, ok := new(big.Int).SetString(c.String(optionBidderTopup.Name), 10) + if !ok { + return fmt.Errorf("failed to parse bidder-topup") + } + signer, err := keysigner.NewKeystoreSigner( c.String(optionKeystorePath.Name), c.String(optionKeystorePassword.Name), @@ -313,9 +341,10 @@ func main() { Logger: logger, GasTipCap: gasTipCap, GasFeeCap: gasFeeCap, - AutoDepositAmount: autoDepositAmount, + TargetDepositAmount: targetDepositAmount, SettlementThreshold: settlementThreshold, SettlementTopup: settlementTopup, + BidderTopup: bidderTopup, SettlementRPCUrl: c.String(optionSettlementRPCUrl.Name), BidderRPC: c.String(optionBidderRPCUrl.Name), L1RPCUrls: c.StringSlice(optionL1RPCUrls.Name), @@ -325,6 +354,7 @@ func main() { DepositAddress: common.HexToAddress(c.String(optionDepositAddress.Name)), BridgeAddress: common.HexToAddress(c.String(optionBridgeAddress.Name)), PricerAPIKey: c.String(optionBlocknativeAPIKey.Name), + Webhooks: c.StringSlice(optionWebhookURLs.Name), } s, err := service.New(&config) diff --git a/tools/preconf-rpc/notifier/notifier.go b/tools/preconf-rpc/notifier/notifier.go new file mode 100644 index 000000000..baf9cecee --- /dev/null +++ b/tools/preconf-rpc/notifier/notifier.go @@ -0,0 +1,299 @@ +package notifier + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "math/big" + "net/http" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" + "github.com/primev/mev-commit/tools/preconf-rpc/sender" +) + +// Message represents a notification message structure +// generalized to support different platforms. +type Message struct { + Text string `json:"text,omitempty"` + Attachments []Attachment `json:"attachments,omitempty"` +} + +// Attachment represents a message attachment +type Attachment struct { + Color string `json:"color,omitempty"` + Title string `json:"title,omitempty"` + Text string `json:"text,omitempty"` + Fields []Field `json:"fields,omitempty"` + Footer string `json:"footer,omitempty"` + TS int64 `json:"ts,omitempty"` + MarkdownIn []string `json:"mrkdwn_in,omitempty"` +} + +// Field represents a field in a message attachment +type Field struct { + Title string `json:"title"` + Value string `json:"value"` + Short bool `json:"short"` +} + +type txnInfo struct { + txn *sender.Transaction + noOfAttempts int + start time.Time +} + +// Notifier sends notifications to multiple webhook endpoints +type Notifier struct { + webhookURLs []string + client *http.Client + logger *slog.Logger + queuedTxns []txnInfo + queuedMu sync.Mutex +} + +// NewNotifier creates a new notifier instance +func NewNotifier(webhookURLs []string, logger *slog.Logger) *Notifier { + return &Notifier{ + webhookURLs: webhookURLs, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + logger: logger, + } +} + +// SendMessage sends a message to all configured webhook endpoints +func (n *Notifier) SendMessage(ctx context.Context, message Message) error { + messageJSON, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + var retErr error + for _, url := range n.webhookURLs { + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(messageJSON)) + if err != nil { + n.logger.Error("Failed to create HTTP request", "url", url, "error", err) + retErr = errors.Join(retErr, err) + continue + } + req.Header.Set("Content-Type", "application/json") + + resp, err := n.client.Do(req) + if err != nil { + n.logger.Error("Failed to send notification", "url", url, "error", err) + retErr = errors.Join(retErr, err) + continue + } + func() { + // Ensure the body is fully read and closed to allow connection reuse + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + errMsg := fmt.Sprintf("non-2xx response: %d - %s", resp.StatusCode, string(body)) + n.logger.Error("Failed to send notification", "url", url, "error", errMsg) + retErr = errors.Join(retErr, errors.New(errMsg)) + continue + } + + n.logger.Info("Notification sent successfully", "url", url) + } + + return retErr +} + +type BalanceGetter interface { + BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) +} + +func (n *Notifier) SetupLowBalanceNotification( + ctx context.Context, + desc string, + getter BalanceGetter, + account common.Address, + thresholdEth float64, + checkInterval time.Duration, + alertCooldown time.Duration, +) <-chan struct{} { + ticker := time.NewTicker(checkInterval) + done := make(chan struct{}) + go func() { + defer close(done) + defer ticker.Stop() + var lastAlert time.Time + + for { + select { + case <-ctx.Done(): + n.logger.Info("Low balance notification routine stopping due to context cancellation") + return + case <-ticker.C: + if time.Since(lastAlert) < alertCooldown { + continue + } + + balance, err := getter.BalanceAt(ctx, account, nil) + if err != nil { + n.logger.Error("Failed to get account balance", "account", account, "error", err) + continue + } + balanceEth := new(big.Float).Quo(new(big.Float).SetInt(balance), big.NewFloat(params.Ether)) + balanceEthFloat, _ := balanceEth.Float64() + if balanceEthFloat < thresholdEth { + message := Message{ + Text: "🏦 Low Balance Alert", + Attachments: []Attachment{ + { + Color: "#ff0000", + Title: desc, + Text: fmt.Sprintf("Account: %s\nBalance: %s (Threshold: %.4f ETH)", account, formatWeiToEth(balance), thresholdEth), + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + if err := n.SendMessage(ctx, message); err != nil { + n.logger.Error("Failed to send low balance notification", "error", err) + } else { + lastAlert = time.Now() + } + } + } + } + }() + return done +} + +func (n *Notifier) SendBidderFundedNotification( + ctx context.Context, + account common.Address, + amountWei *big.Int, +) error { + message := Message{ + Text: "💵 Bidder Funded", + Attachments: []Attachment{ + { + Color: "#36a64f", + Title: "Bidder account was funded", + Text: fmt.Sprintf("Account: %s\nAmount: %s", account, formatWeiToEth(amountWei)), + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + return n.SendMessage(ctx, message) +} + +func (n *Notifier) StartTransactionNotifier( + ctx context.Context, +) <-chan struct{} { + done := make(chan struct{}) + ticker := time.NewTicker(15 * time.Minute) + + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + n.logger.Info("Transaction notification routine stopping due to context cancellation") + return + case <-ticker.C: + n.queuedMu.Lock() + if len(n.queuedTxns) == 0 { + n.logger.Debug("No queued transactions to notify about") + n.queuedMu.Unlock() + continue + } + txnsToNotify := n.queuedTxns + n.queuedTxns = nil + n.queuedMu.Unlock() + // create markdown table with the txn info + text := "" + for _, txnInfo := range txnsToNotify { + duration := time.Since(txnInfo.start).Round(time.Second) + status := "" + switch txnInfo.txn.Status { + case sender.TxStatusPreConfirmed: + status = "⚡ Pre-Confirmed" + case sender.TxStatusConfirmed: + status = "✅ Confirmed" + case sender.TxStatusFailed: + status = "❌ Failed" + status = fmt.Sprintf("%s (Error: %s)", status, txnInfo.txn.Details) + default: + status = "❓ Unknown" + } + txType := "" + switch txnInfo.txn.Type { + case sender.TxTypeRegular: + txType = "💸 ETH Transaction" + case sender.TxTypeDeposit: + txType = "🏦 Deposit" + case sender.TxTypeInstantBridge: + txType = "🌉 Instant Bridge" + default: + txType = "❓ Unknown" + } + text += fmt.Sprintf( + "- Txn: %s | Sender: %s | Attempts: %d | Duration: %s | Type: %s | Status: %s\n", + txnInfo.txn.Hash().Hex()[:8], + txnInfo.txn.Sender.Hex()[:8], + txnInfo.noOfAttempts, + duration, + txType, + status, + ) + } + message := Message{ + Text: "🚀 Transaction Report", + Attachments: []Attachment{ + { + Color: "#FFA500", + Title: "The following transactions were completed in the last 15 mins", + Text: text, + Footer: "Preconf RPC Monitor", + TS: time.Now().Unix(), + }, + }, + } + if err := n.SendMessage(ctx, message); err != nil { + n.logger.Error("Failed to send 15 minute transaction notification", "error", err) + } + } + } + }() + + return done +} + +func (n *Notifier) NotifyTransactionStatus( + txn *sender.Transaction, + noOfAttempts int, + start time.Time, +) { + n.queuedMu.Lock() + defer n.queuedMu.Unlock() + + n.queuedTxns = append(n.queuedTxns, txnInfo{ + txn: txn, + noOfAttempts: noOfAttempts, + start: start, + }) +} + +func formatWeiToEth(wei *big.Int) string { + ethValue := new(big.Float).Quo(new(big.Float).SetInt(wei), new(big.Float).SetFloat64(params.Ether)) + return fmt.Sprintf("%.6f ETH", ethValue) +} diff --git a/tools/preconf-rpc/notifier/notifier_test.go b/tools/preconf-rpc/notifier/notifier_test.go new file mode 100644 index 000000000..06afde626 --- /dev/null +++ b/tools/preconf-rpc/notifier/notifier_test.go @@ -0,0 +1,62 @@ +package notifier + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "math/big" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" +) + +func TestFormatWeiToEth(t *testing.T) { + cases := []struct { + wei *big.Int + want string + }{ + {big.NewInt(0), "0.000000 ETH"}, + {big.NewInt(params.Ether), "1.000000 ETH"}, + {big.NewInt(1234567890000000000), "1.234568 ETH"}, + } + for _, c := range cases { + if got := formatWeiToEth(c.wei); got != c.want { + t.Errorf("formatWeiToEth(%v) = %q; want %q", c.wei, got, c.want) + } + } +} + +func TestSendMessage_Success(t *testing.T) { + var received []byte + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + received = body + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{server.URL}, logger) + msg := Message{Text: "test"} + require.NoError(t, n.SendMessage(context.Background(), msg)) + + var got Message + require.NoError(t, json.Unmarshal(received, &got)) + require.Equal(t, msg.Text, got.Text) +} + +func TestSendMessage_NonOK(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + n := NewNotifier([]string{server.URL}, logger) + err := n.SendMessage(context.Background(), Message{Text: "err"}) + require.Error(t, err) +} diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 992dca739..6fda65c8e 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -112,6 +112,10 @@ type txnAttempt struct { attempts []*blockAttempt } +type Notifier interface { + NotifyTransactionStatus(txn *Transaction, noOfAttempts int, start time.Time) +} + type TxSender struct { logger *slog.Logger store Store @@ -129,6 +133,7 @@ type TxSender struct { inflightMu sync.RWMutex processMu sync.RWMutex txnAttemptHistory *lru.Cache[common.Hash, *txnAttempt] + notifier Notifier } func NewTxSender( @@ -137,6 +142,7 @@ func NewTxSender( pricer Pricer, blockTracker BlockTracker, transferer Transferer, + notifier Notifier, settlementChainId *big.Int, logger *slog.Logger, ) (*TxSender, error) { @@ -159,6 +165,7 @@ func NewTxSender( inflightTxns: make(map[common.Hash]chan struct{}), inflightAccount: make(map[common.Address]struct{}), txnAttemptHistory: txnAttemptHistory, + notifier: notifier, }, nil } @@ -380,7 +387,7 @@ func (t *TxSender) processQueuedTransactions(ctx context.Context) { t.logger.Error("Failed to process transaction", "sender", txn.Sender.Hex(), "error", err) txn.Status = TxStatusFailed txn.Details = err.Error() - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) return t.store.StoreTransaction(ctx, txn, nil) } return nil @@ -438,7 +445,7 @@ BID_LOOP: "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) break BID_LOOP } default: @@ -478,7 +485,7 @@ BID_LOOP: "blockNumber", result.blockNumber, "bidAmount", result.bidAmount.String(), ) - t.clearBlockAttemptHistory(txn.Hash()) + t.clearBlockAttemptHistory(txn) break BID_LOOP } } @@ -750,8 +757,8 @@ func (t *TxSender) calculatePriceForNextBlock( ) } -func (t *TxSender) clearBlockAttemptHistory(txnHash common.Hash) { - attempts, found := t.txnAttemptHistory.Get(txnHash) +func (t *TxSender) clearBlockAttemptHistory(txn *Transaction) { + attempts, found := t.txnAttemptHistory.Get(txn.Hash()) if !found { return } @@ -763,12 +770,14 @@ func (t *TxSender) clearBlockAttemptHistory(txnHash common.Hash) { t.logger.Info( "Clearing block attempt history for transaction", - "hash", txnHash.Hex(), + "hash", txn.Hash().Hex(), "blockAttempts", len(attempts.attempts), "startTime", attempts.startTime.Format(time.RFC3339), "startBlockNumber", attempts.attempts[0].blockNumber, "totalAttempts", totalAttempts, ) - _ = t.txnAttemptHistory.Remove(txnHash) + _ = t.txnAttemptHistory.Remove(txn.Hash()) + + t.notifier.NotifyTransactionStatus(txn, totalAttempts, attempts.startTime) } diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index 04da270ca..3bd8f90b2 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -257,6 +257,14 @@ func (m *mockTransferer) Transfer(ctx context.Context, to common.Address, chainI return nil } +type mockNotifier struct { + notifications []string +} + +func (m *mockNotifier) NotifyTransactionStatus(txn *sender.Transaction, attempts int, start time.Time) { + m.notifications = append(m.notifications, txn.Hash().Hex()) +} + func TestSender(t *testing.T) { t.Parallel() @@ -276,6 +284,7 @@ func TestSender(t *testing.T) { bnOut: make(chan blockNoOp, 10), bnErr: make(chan error, 1), } + notifier := &mockNotifier{} sndr, err := sender.NewTxSender( st, @@ -283,6 +292,7 @@ func TestSender(t *testing.T) { testPricer, blockTracker, &mockTransferer{}, + notifier, big.NewInt(1), // Settlement chain ID util.NewTestLogger(os.Stdout), ) @@ -518,6 +528,10 @@ func TestSender(t *testing.T) { cancel() <-done + + if len(notifier.notifications) != 2 { + t.Fatalf("expected 2 notifications, got %d", len(notifier.notifications)) + } } func TestCancelTransaction(t *testing.T) { @@ -546,6 +560,7 @@ func TestCancelTransaction(t *testing.T) { testPricer, blockTracker, &mockTransferer{}, + &mockNotifier{}, big.NewInt(1), // Settlement chain ID util.NewTestLogger(os.Stdout), ) diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index cbb189595..9e2edb4b1 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -10,6 +10,7 @@ import ( "log/slog" "math/big" "net/http" + "slices" "time" "github.com/ethereum/go-ethereum/common" @@ -20,6 +21,7 @@ import ( notificationsapiv1 "github.com/primev/mev-commit/p2p/gen/go/notificationsapi/v1" "github.com/primev/mev-commit/tools/preconf-rpc/blocktracker" "github.com/primev/mev-commit/tools/preconf-rpc/handlers" + "github.com/primev/mev-commit/tools/preconf-rpc/notifier" "github.com/primev/mev-commit/tools/preconf-rpc/pricer" "github.com/primev/mev-commit/tools/preconf-rpc/rpcserver" "github.com/primev/mev-commit/tools/preconf-rpc/sender" @@ -44,7 +46,7 @@ type Config struct { PgSSL bool Signer keysigner.KeySigner BidderRPC string - AutoDepositAmount *big.Int + TargetDepositAmount *big.Int L1RPCUrls []string SettlementRPCUrl string L1ContractAddr common.Address @@ -53,10 +55,12 @@ type Config struct { BridgeAddress common.Address SettlementThreshold *big.Int SettlementTopup *big.Int + BidderTopup *big.Int HTTPPort int GasTipCap *big.Int GasFeeCap *big.Int PricerAPIKey string + Webhooks []string } type Service struct { @@ -106,46 +110,11 @@ func New(config *Config) (*Service, error) { topologyCli := debugapiv1.NewDebugServiceClient(conn) notificationsCli := notificationsapiv1.NewNotificationsClient(conn) - status, err := bidderCli.DepositManagerStatus(context.Background(), &bidderapiv1.DepositManagerStatusRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get deposit manager status: %w", err) - } - if !status.Enabled { - resp, err := bidderCli.EnableDepositManager(context.Background(), &bidderapiv1.EnableDepositManagerRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to enable deposit manager: %w", err) - } - if !resp.Success { - return nil, fmt.Errorf("failed to enable deposit manager") - } - } - config.Logger.Info("deposit manager enabled") - - validProviders, err := bidderCli.GetValidProviders(context.Background(), &bidderapiv1.GetValidProvidersRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get valid providers: %w", err) - } - if len(validProviders.ValidProviders) == 0 { - return nil, fmt.Errorf("no valid providers found") + if err := setupDeposits(bidderCli, config.TargetDepositAmount); err != nil { + return nil, fmt.Errorf("failed to setup deposits: %w", err) } - targetDeposits := make([]*bidderapiv1.TargetDeposit, len(validProviders.ValidProviders)) - for i, provider := range validProviders.ValidProviders { - targetDeposits[i] = &bidderapiv1.TargetDeposit{ - Provider: provider, - TargetDeposit: config.AutoDepositAmount.String(), - } - } - - resp, err := bidderCli.SetTargetDeposits(context.Background(), &bidderapiv1.SetTargetDepositsRequest{ - TargetDeposits: targetDeposits, - }) - if err != nil { - return nil, fmt.Errorf("failed to set target deposits: %w", err) - } - if len(resp.SuccessfullySetDeposits) != len(targetDeposits) { - return nil, fmt.Errorf("failed to set target deposits") - } + notifier := notifier.NewNotifier(config.Webhooks, config.Logger.With("module", "notifier")) bridgeConfig := transfer.BridgeConfig{ Signer: config.Signer, @@ -185,6 +154,44 @@ func New(config *Config) (*Service, error) { healthChecker := health.New() + balanceNotifierDone := notifier.SetupLowBalanceNotification( + ctx, + "RPC Operator AccountBalance Low", + l1RPCClient.RawClient(), + config.Signer.GetAddress(), + 3.0, + 5*time.Minute, + 15*time.Minute, + ) + + healthChecker.Register(health.CloseChannelHealthCheck("BalanceNotifier", balanceNotifierDone)) + s.closers = append(s.closers, channelCloser(balanceNotifierDone)) + + bidderEOA, err := getBidderEOA(topologyCli) + if err != nil { + return nil, fmt.Errorf("failed to get bidder EOA: %w", err) + } + + bidderFunderDone := startBidderFunder( + ctx, + config.Logger.With("module", "bidderfunder"), + bidderEOA, + accountsync.NewAccountSync(bidderEOA, settlementClient), + transferer, + config.SettlementThreshold, + config.BidderTopup, + settlementClient, + settlementChainID, + notifier, + ) + + txnNotifierDone := notifier.StartTransactionNotifier(ctx) + healthChecker.Register(health.CloseChannelHealthCheck("TransactionNotifier", txnNotifierDone)) + s.closers = append(s.closers, channelCloser(txnNotifierDone)) + + healthChecker.Register(health.CloseChannelHealthCheck("BidderFunder", bidderFunderDone)) + s.closers = append(s.closers, channelCloser(bidderFunderDone)) + bridgerDone := bridger.Start(ctx) healthChecker.Register(health.CloseChannelHealthCheck("Bridger", bridgerDone)) s.closers = append(s.closers, channelCloser(bridgerDone)) @@ -230,6 +237,7 @@ func New(config *Config) (*Service, error) { bidpricer, blockTracker, transferer, + notifier, settlementChainID, config.Logger.With("module", "txsender"), ) @@ -332,3 +340,117 @@ func initDB(opts *Config) (db *sql.DB, err error) { return db, err } + +func startBidderFunder( + ctx context.Context, + logger *slog.Logger, + bidderAccount common.Address, + syncer *accountsync.AccountSync, + transferer *transfer.Transferer, + settlementThreshold *big.Int, + settlementTopup *big.Int, + settlementClient *ethclient.Client, + settlementChainID *big.Int, + notifier *notifier.Notifier, +) <-chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + + for { + sub := syncer.Subscribe(ctx, settlementThreshold) + select { + case <-ctx.Done(): + return + case <-sub: + logger.Info("bidder account balance below threshold") + err := transferer.Transfer(ctx, bidderAccount, settlementChainID, settlementTopup) + if err != nil { + logger.Error("failed to transfer funds to bidder account", "error", err) + } else { + logger.Info("successfully transferred funds to bidder account") + if err := notifier.SendBidderFundedNotification( + ctx, + bidderAccount, + settlementTopup, + ); err != nil { + logger.Error("failed to send bidder funded notification", "error", err) + } + } + time.Sleep(1 * time.Minute) // Prevent rapid retries + } + } + }() + + return done +} + +func setupDeposits(bidderCli bidderapiv1.BidderClient, amount *big.Int) error { + status, err := bidderCli.DepositManagerStatus(context.Background(), &bidderapiv1.DepositManagerStatusRequest{}) + if err != nil { + return fmt.Errorf("failed to get deposit manager status: %w", err) + } + if !status.Enabled { + resp, err := bidderCli.EnableDepositManager(context.Background(), &bidderapiv1.EnableDepositManagerRequest{}) + if err != nil { + return fmt.Errorf("failed to enable deposit manager: %w", err) + } + if !resp.Success { + return fmt.Errorf("failed to enable deposit manager") + } + } + + validProviders, err := bidderCli.GetValidProviders(context.Background(), &bidderapiv1.GetValidProvidersRequest{}) + if err != nil { + return fmt.Errorf("failed to get valid providers: %w", err) + } + if len(validProviders.ValidProviders) == 0 { + return fmt.Errorf("no valid providers found") + } + + targetDeposits := make([]*bidderapiv1.TargetDeposit, 0, len(validProviders.ValidProviders)) + for _, provider := range validProviders.ValidProviders { + if status.Enabled && slices.ContainsFunc(status.TargetDeposits, func(td *bidderapiv1.TargetDeposit) bool { + if td.Provider == provider && td.TargetDeposit == amount.String() { + return true + } + return false + }) { + continue + } + targetDeposits = append(targetDeposits, &bidderapiv1.TargetDeposit{ + Provider: provider, + TargetDeposit: amount.String(), + }) + } + + if len(targetDeposits) > 0 { + resp, err := bidderCli.SetTargetDeposits(context.Background(), &bidderapiv1.SetTargetDepositsRequest{ + TargetDeposits: targetDeposits, + }) + if err != nil { + return fmt.Errorf("failed to set target deposits: %w", err) + } + if len(resp.SuccessfullySetDeposits) != len(targetDeposits) { + return fmt.Errorf("failed to set target deposits") + } + } + + return nil +} + +func getBidderEOA(debugClient debugapiv1.DebugServiceClient) (common.Address, error) { + info, err := debugClient.GetTopology(context.Background(), &debugapiv1.EmptyMessage{}) + if err != nil { + return common.Address{}, fmt.Errorf("failed to get node info: %w", err) + } + self := info.Topology.Fields["self"].GetStructValue() + if self == nil { + return common.Address{}, fmt.Errorf("self field not found in topology") + } + addressHex := self.Fields["Ethereum Address"].GetStringValue() + if addressHex == "" { + return common.Address{}, fmt.Errorf("ethereum address not found in topology self field") + } + return common.HexToAddress(addressHex), nil +}