diff --git a/p2p/pkg/depositmanager/deposit.go b/p2p/pkg/depositmanager/deposit.go index 4ea43d730..8404b10ea 100644 --- a/p2p/pkg/depositmanager/deposit.go +++ b/p2p/pkg/depositmanager/deposit.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry" + "github.com/primev/mev-commit/p2p/pkg/notifications" "github.com/primev/mev-commit/x/contracts/events" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -20,15 +21,16 @@ type BidderRegistryContract interface { } type Store interface { - GetBalance(bidder common.Address, provider common.Address) (*big.Int, error) - SetBalance(bidder common.Address, provider common.Address, balance *big.Int) error - DeleteBalance(bidder common.Address, provider common.Address) error - RefundBalanceIfExists(bidder common.Address, provider common.Address, amount *big.Int) error + GetBalance(bidder common.Address) (*big.Int, error) + SetBalance(bidder common.Address, balance *big.Int) error + DeleteBalance(bidder common.Address) error + RefundBalanceIfExists(bidder common.Address, amount *big.Int) error } type DepositManager struct { store Store evtMgr events.EventManager + notifiee notifications.Notifiee bidderRegistry BidderRegistryContract deposits chan *bidderregistry.BidderregistryBidderDeposited withdrawRequests chan *bidderregistry.BidderregistryWithdrawalRequested @@ -40,12 +42,14 @@ type DepositManager struct { func NewDepositManager( store Store, evtMgr events.EventManager, + notifiee notifications.Notifiee, bidderRegistry BidderRegistryContract, thisProviderAddress common.Address, logger *slog.Logger, ) *DepositManager { return &DepositManager{ store: store, + notifiee: notifiee, bidderRegistry: bidderRegistry, deposits: make(chan *bidderregistry.BidderregistryBidderDeposited), withdrawRequests: make(chan *bidderregistry.BidderregistryWithdrawalRequested), @@ -61,6 +65,11 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { eg, egCtx := errgroup.WithContext(ctx) + notifCh := dm.notifiee.Subscribe( + notifications.TopicCommitmentStoreFailed, + notifications.TopicOtherProviderWonBlock, + ) + ev1 := events.NewEventHandler( "BidderDeposited", func(bidderDeposit *bidderregistry.BidderregistryBidderDeposited) { @@ -103,6 +112,11 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { eg.Go(func() error { defer sub.Unsubscribe() + defer func() { + unsubDone := dm.notifiee.Unsubscribe(notifCh) + <-unsubDone + }() + select { case <-egCtx.Done(): dm.logger.Info("event subscription context done") @@ -119,18 +133,42 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { dm.logger.Info("deposit manager context done") return nil + case n := <-notifCh: + topic := n.Topic() + if topic != notifications.TopicOtherProviderWonBlock && topic != notifications.TopicCommitmentStoreFailed { + dm.logger.Debug("ignoring notification for topic", "topic", topic) + continue + } + + val := n.Value() + bidderHex := val["bidder"].(string) + bidAmount := val["bidAmount"].(string) + + bidder := common.HexToAddress(bidderHex) + bidAmountInt, ok := new(big.Int).SetString(bidAmount, 10) + if !ok { + dm.logger.Error("failed to parse bid amount", "bidAmount", bidAmount) + continue + } + + if err := dm.store.RefundBalanceIfExists(bidder, bidAmountInt); err != nil { + dm.logger.Error("refunding balance", "error", err) + return err + } + dm.logger.Info("refunded balance from notification", "bidder", bidder, "bidAmount", bidAmountInt) + case deposit := <-dm.deposits: if deposit.Provider != dm.thisProviderAddress { dm.logger.Debug("ignoring deposit event for different provider", "provider", deposit.Provider) continue } - currentBalance, err := dm.store.GetBalance(deposit.Bidder, deposit.Provider) + currentBalance, err := dm.store.GetBalance(deposit.Bidder) if err != nil { dm.logger.Error("getting balance", "error", err) return err } if currentBalance == nil { - if err := dm.store.SetBalance(deposit.Bidder, deposit.Provider, deposit.NewAvailableAmount); err != nil { + if err := dm.store.SetBalance(deposit.Bidder, deposit.NewAvailableAmount); err != nil { dm.logger.Error("setting balance", "error", err) return err } @@ -141,7 +179,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { ) } else { newBalance := new(big.Int).Add(currentBalance, deposit.DepositedAmount) - if err := dm.store.SetBalance(deposit.Bidder, deposit.Provider, newBalance); err != nil { + if err := dm.store.SetBalance(deposit.Bidder, newBalance); err != nil { dm.logger.Error("setting balance", "error", err) return err } @@ -156,7 +194,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { dm.logger.Debug("ignoring withdrawal request event for different provider", "provider", withdrawalRequest.Provider) continue } - if err := dm.store.DeleteBalance(withdrawalRequest.Bidder, withdrawalRequest.Provider); err != nil { + if err := dm.store.DeleteBalance(withdrawalRequest.Bidder); err != nil { dm.logger.Error("deleting balance", "error", err) return err } @@ -170,7 +208,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { dm.logger.Debug("ignoring withdrawal event for different provider", "provider", withdrawal.Provider) continue } - if err := dm.store.DeleteBalance(withdrawal.Bidder, withdrawal.Provider); err != nil { + if err := dm.store.DeleteBalance(withdrawal.Bidder); err != nil { dm.logger.Error("deleting balance", "error", err) return err } @@ -194,7 +232,6 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} { func (dm *DepositManager) CheckAndDeductDeposit( ctx context.Context, bidderAddr common.Address, - providerAddr common.Address, bidAmountStr string, ) (func() error, error) { bidAmount, ok := new(big.Int).SetString(bidAmountStr, 10) @@ -203,7 +240,7 @@ func (dm *DepositManager) CheckAndDeductDeposit( return nil, status.Errorf(codes.InvalidArgument, "failed to parse bid amount") } - balance, err := dm.store.GetBalance(bidderAddr, providerAddr) + balance, err := dm.store.GetBalance(bidderAddr) if err != nil { dm.logger.Error("getting balance", "error", err) return nil, status.Errorf(codes.Internal, "failed to get balance: %v", err) @@ -216,28 +253,28 @@ func (dm *DepositManager) CheckAndDeductDeposit( return nil, status.Errorf(codes.FailedPrecondition, "insufficient balance") } - if err := dm.store.SetBalance(bidderAddr, providerAddr, newBalance); err != nil { + if err := dm.store.SetBalance(bidderAddr, newBalance); err != nil { dm.logger.Error("setting balance", "error", err) return nil, status.Errorf(codes.Internal, "failed to set balance: %v", err) } return func() error { - return dm.store.RefundBalanceIfExists(bidderAddr, providerAddr, bidAmount) + return dm.store.RefundBalanceIfExists(bidderAddr, bidAmount) }, nil } dm.logger.Info("balance not found in store, defaulting to contract call", "bidder", bidderAddr.Hex(), - "provider", providerAddr.Hex(), + "provider", dm.thisProviderAddress.Hex(), ) - defaultBalance, err := dm.getDefaultBalance(ctx, bidderAddr, providerAddr, nil) // nil for latest block + defaultBalance, err := dm.getDefaultBalance(ctx, bidderAddr, dm.thisProviderAddress, nil) // nil for latest block if err != nil { return nil, err } if defaultBalance == nil { - dm.logger.Error("bidder balance not found", "bidder", bidderAddr.Hex(), "provider", providerAddr.Hex()) + dm.logger.Error("bidder balance not found", "bidder", bidderAddr.Hex(), "provider", dm.thisProviderAddress.Hex()) return nil, status.Errorf(codes.FailedPrecondition, - "balance not found for bidder %s and provider %s", bidderAddr.Hex(), providerAddr.Hex()) + "balance not found for bidder %s and provider %s", bidderAddr.Hex(), dm.thisProviderAddress.Hex()) } if defaultBalance.Cmp(bidAmount) < 0 { @@ -246,13 +283,13 @@ func (dm *DepositManager) CheckAndDeductDeposit( } newBalance := new(big.Int).Sub(defaultBalance, bidAmount) - if err := dm.store.SetBalance(bidderAddr, providerAddr, newBalance); err != nil { + if err := dm.store.SetBalance(bidderAddr, newBalance); err != nil { dm.logger.Error("setting balance for block", "error", err) return nil, status.Errorf(codes.Internal, "failed to set balance for block: %v", err) } return func() error { - return dm.store.RefundBalanceIfExists(bidderAddr, providerAddr, bidAmount) + return dm.store.RefundBalanceIfExists(bidderAddr, bidAmount) }, nil } @@ -276,7 +313,7 @@ func (dm *DepositManager) getDefaultBalance( } if balance.Cmp(big.NewInt(0)) > 0 { - if err := dm.store.SetBalance(bidderAddr, providerAddr, balance); err != nil { + if err := dm.store.SetBalance(bidderAddr, balance); err != nil { dm.logger.Error("setting balance", "error", err) return nil, status.Errorf(codes.Internal, "failed to set balance: %v", err) } diff --git a/p2p/pkg/depositmanager/deposit_test.go b/p2p/pkg/depositmanager/deposit_test.go index ca592044a..4dff9707d 100644 --- a/p2p/pkg/depositmanager/deposit_test.go +++ b/p2p/pkg/depositmanager/deposit_test.go @@ -18,6 +18,7 @@ import ( blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" "github.com/primev/mev-commit/p2p/pkg/depositmanager" depositstore "github.com/primev/mev-commit/p2p/pkg/depositmanager/store" + "github.com/primev/mev-commit/p2p/pkg/notifications" inmemstorage "github.com/primev/mev-commit/p2p/pkg/storage/inmem" "github.com/primev/mev-commit/x/contracts/events" "github.com/primev/mev-commit/x/util" @@ -66,14 +67,13 @@ func TestDepositManager(t *testing.T) { providerAddress := common.HexToAddress("0x456") - dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger) + dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger) done := dm.Start(ctx) // no deposit refund, err := dm.CheckAndDeductDeposit( context.Background(), common.HexToAddress("0x123"), - common.HexToAddress("0x456"), "10", ) if err == nil { @@ -98,7 +98,6 @@ func TestDepositManager(t *testing.T) { for { if val, err := st.GetBalance( common.HexToAddress("0x123"), - common.HexToAddress("0x456"), ); err == nil && val != nil && val.Cmp(big.NewInt(100)) == 0 { break } @@ -109,7 +108,6 @@ func TestDepositManager(t *testing.T) { refund, err = dm.CheckAndDeductDeposit( context.Background(), common.HexToAddress("0x123"), - common.HexToAddress("0x456"), "100", ) if err != nil { @@ -120,7 +118,6 @@ func TestDepositManager(t *testing.T) { _, err = dm.CheckAndDeductDeposit( context.Background(), common.HexToAddress("0x123"), - common.HexToAddress("0x456"), "10", ) if err == nil || !strings.Contains(err.Error(), "insufficient balance") { @@ -136,7 +133,6 @@ func TestDepositManager(t *testing.T) { _, err = dm.CheckAndDeductDeposit( context.Background(), common.HexToAddress("0x123"), - common.HexToAddress("0x456"), "10", ) if err != nil { @@ -145,7 +141,6 @@ func TestDepositManager(t *testing.T) { balance, err := st.GetBalance( common.HexToAddress("0x123"), - common.HexToAddress("0x456"), ) if err != nil { t.Fatal(err) @@ -168,7 +163,6 @@ func TestDepositManager(t *testing.T) { for { if val, err := st.GetBalance( common.HexToAddress("0x123"), - common.HexToAddress("0x456"), ); err == nil && val == nil { break } @@ -209,7 +203,6 @@ func TestDepositManager(t *testing.T) { for { if val, err := st.GetBalance( common.HexToAddress("0x123"), - common.HexToAddress("0x456"), ); err == nil && val != nil && val.Cmp(big.NewInt(777)) == 0 { break } @@ -254,7 +247,7 @@ func TestStartWithBidderAlreadyDeposited(t *testing.T) { providerAddress := common.HexToAddress("0x456") - dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger) + dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger) done := dm.Start(ctx) err = publishBidderDeposited(evtMgr, &brABI, &bidderregistry.BidderregistryBidderDeposited{ @@ -273,7 +266,6 @@ func TestStartWithBidderAlreadyDeposited(t *testing.T) { for { if val, err := st.GetBalance( common.HexToAddress("0x123"), - common.HexToAddress("0x456"), ); err == nil && val != nil && val.Cmp(big.NewInt(133)) == 0 { break } @@ -316,7 +308,7 @@ func TestOtherProvidersEventsAreIgnored(t *testing.T) { providerAddress := common.HexToAddress("0x456") - dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger) + dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger) done := dm.Start(ctx) differentProvider := common.HexToAddress("0x789") diff --git a/p2p/pkg/depositmanager/store/store.go b/p2p/pkg/depositmanager/store/store.go index a5de60770..d06782d78 100644 --- a/p2p/pkg/depositmanager/store/store.go +++ b/p2p/pkg/depositmanager/store/store.go @@ -17,8 +17,8 @@ const ( ) var ( - balanceKey = func(bidder common.Address, provider common.Address) string { - return fmt.Sprintf("%s%s/%s", balanceNS, bidder, provider) + balanceKey = func(bidder common.Address) string { + return fmt.Sprintf("%s%s", balanceNS, bidder) } balancePrefix = func(bidder common.Address) string { return fmt.Sprintf("%s%s", balanceNS, bidder) @@ -36,18 +36,18 @@ func New(st storage.Storage) *Store { } } -func (s *Store) SetBalance(bidder common.Address, provider common.Address, depositedAmount *big.Int) error { +func (s *Store) SetBalance(bidder common.Address, depositedAmount *big.Int) error { s.mu.Lock() defer s.mu.Unlock() - return s.st.Put(balanceKey(bidder, provider), depositedAmount.Bytes()) + return s.st.Put(balanceKey(bidder), depositedAmount.Bytes()) } -func (s *Store) GetBalance(bidder common.Address, provider common.Address) (*big.Int, error) { +func (s *Store) GetBalance(bidder common.Address) (*big.Int, error) { s.mu.RLock() defer s.mu.RUnlock() - val, err := s.st.Get(balanceKey(bidder, provider)) + val, err := s.st.Get(balanceKey(bidder)) switch { case errors.Is(err, storage.ErrKeyNotFound): return nil, nil @@ -58,21 +58,20 @@ func (s *Store) GetBalance(bidder common.Address, provider common.Address) (*big return new(big.Int).SetBytes(val), nil } -func (s *Store) DeleteBalance(bidder common.Address, provider common.Address) error { +func (s *Store) DeleteBalance(bidder common.Address) error { s.mu.Lock() defer s.mu.Unlock() - return s.st.Delete(balanceKey(bidder, provider)) + return s.st.Delete(balanceKey(bidder)) } func (s *Store) RefundBalanceIfExists( bidder common.Address, - provider common.Address, amount *big.Int, ) error { s.mu.Lock() defer s.mu.Unlock() - val, err := s.st.Get(balanceKey(bidder, provider)) + val, err := s.st.Get(balanceKey(bidder)) switch { case errors.Is(err, storage.ErrKeyNotFound): return status.Errorf(codes.FailedPrecondition, "balance not found, no refund needed") @@ -81,7 +80,7 @@ func (s *Store) RefundBalanceIfExists( } newAmount := new(big.Int).Add(new(big.Int).SetBytes(val), amount) - return s.st.Put(balanceKey(bidder, provider), newAmount.Bytes()) + return s.st.Put(balanceKey(bidder), newAmount.Bytes()) } func (s *Store) BalanceEntries(bidder common.Address) (int, error) { diff --git a/p2p/pkg/depositmanager/store/store_test.go b/p2p/pkg/depositmanager/store/store_test.go index 5c11c64b6..ed3253159 100644 --- a/p2p/pkg/depositmanager/store/store_test.go +++ b/p2p/pkg/depositmanager/store/store_test.go @@ -15,15 +15,14 @@ func TestStore_SetBalance(t *testing.T) { s := store.New(st) bidder := common.HexToAddress("0x123") - provider := common.HexToAddress("0x456") depositedAmount := big.NewInt(10) - err := s.SetBalance(bidder, provider, depositedAmount) + err := s.SetBalance(bidder, depositedAmount) if err != nil { t.Fatal(err) } - val, err := s.GetBalance(bidder, provider) + val, err := s.GetBalance(bidder) if err != nil { t.Fatal(err) } @@ -37,15 +36,14 @@ func TestStore_GetBalance(t *testing.T) { s := store.New(st) bidder := common.HexToAddress("0x123") - provider := common.HexToAddress("0x456") depositedAmount := big.NewInt(10) - err := s.SetBalance(bidder, provider, depositedAmount) + err := s.SetBalance(bidder, depositedAmount) if err != nil { t.Fatal(err) } - val, err := s.GetBalance(bidder, provider) + val, err := s.GetBalance(bidder) if err != nil { t.Fatal(err) } @@ -59,9 +57,8 @@ func TestStore_GetBalance_NoBalance(t *testing.T) { s := store.New(st) bidder := common.HexToAddress("0x123") - provider := common.HexToAddress("0x456") - val, err := s.GetBalance(bidder, provider) + val, err := s.GetBalance(bidder) if err != nil { t.Fatal(err) } @@ -75,10 +72,9 @@ func TestStore_RefundBalanceIfExists(t *testing.T) { s := store.New(st) bidder := common.HexToAddress("0x123") - provider := common.HexToAddress("0x456") amount := big.NewInt(20) - err := s.RefundBalanceIfExists(bidder, provider, amount) + err := s.RefundBalanceIfExists(bidder, amount) if err == nil { t.Fatal("expected error, got nil") } @@ -86,18 +82,18 @@ func TestStore_RefundBalanceIfExists(t *testing.T) { t.Fatalf("expected error containing 'balance not found, no refund needed', got %v", err) } - err = s.SetBalance(bidder, provider, amount) + err = s.SetBalance(bidder, amount) if err != nil { t.Fatal(err) } - refundAmount := big.NewInt(5) - err = s.RefundBalanceIfExists(bidder, provider, refundAmount) + increaseAmount := big.NewInt(5) + err = s.RefundBalanceIfExists(bidder, increaseAmount) if err != nil { t.Fatal(err) } - val, err := s.GetBalance(bidder, provider) + val, err := s.GetBalance(bidder) if err != nil { t.Fatal(err) } @@ -112,15 +108,14 @@ func TestStore_DeleteBalance(t *testing.T) { s := store.New(st) bidder := common.HexToAddress("0x123") - provider := common.HexToAddress("0x456") depositedAmount := big.NewInt(10) - err := s.SetBalance(bidder, provider, depositedAmount) + err := s.SetBalance(bidder, depositedAmount) if err != nil { t.Fatal(err) } - val, err := s.GetBalance(bidder, provider) + val, err := s.GetBalance(bidder) if err != nil { t.Fatal(err) } @@ -128,12 +123,12 @@ func TestStore_DeleteBalance(t *testing.T) { t.Fatalf("expected %s, got %s", depositedAmount.String(), val.String()) } - err = s.DeleteBalance(bidder, provider) + err = s.DeleteBalance(bidder) if err != nil { t.Fatal(err) } - val, err = s.GetBalance(bidder, provider) + val, err = s.GetBalance(bidder) if err != nil { t.Fatal(err) } diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index 062346354..2da591e6c 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -11,7 +11,6 @@ import ( "net" "net/http" "strings" - "sync/atomic" "time" "github.com/bufbuild/protovalidate-go" @@ -167,8 +166,6 @@ func NewNode(opts *Options) (*Node, error) { } } - progressstore := &progressStore{contractRPC: contractRPC} - chainID, err := contractRPC.ChainID(context.Background()) if err != nil { opts.Logger.Error("failed to get chain ID", "error", err) @@ -219,6 +216,8 @@ func NewNode(opts *Options) (*Node, error) { } nd.closers = append(nd.closers, store) + progressstore := NewDurableProgressStore(store, contractRPC) + contracts, err := getContractABIs(opts) if err != nil { opts.Logger.Error("failed to get contract ABIs", "error", err) @@ -571,6 +570,7 @@ func NewNode(opts *Options) (*Node, error) { depositMgr = depositmanager.NewDepositManager( depositmanagerstore.New(store), evtMgr, + notificationsSvc, bidderRegistry, opts.KeySigner.GetAddress(), opts.Logger.With("component", "depositmanager"), @@ -937,7 +937,7 @@ func (noOpBidProcessor) ProcessBid( type noOpDepositManager struct{} -func (noOpDepositManager) CheckAndDeductDeposit(_ context.Context, _ common.Address, _ common.Address, _ string) (func() error, error) { +func (noOpDepositManager) CheckAndDeductDeposit(_ context.Context, _ common.Address, _ string) (func() error, error) { return func() error { return nil }, nil } @@ -981,20 +981,6 @@ func (f StartableFunc) Start(ctx context.Context) <-chan struct{} { return f(ctx) } -type progressStore struct { - contractRPC *ethclient.Client - lastBlock atomic.Uint64 -} - -func (p *progressStore) LastBlock() (uint64, error) { - return p.contractRPC.BlockNumber(context.Background()) -} - -func (p *progressStore) SetLastBlock(block uint64) error { - p.lastBlock.Store(block) - return nil -} - func setDefault(field *string, defaultValue string) { if *field == "" { *field = defaultValue diff --git a/p2p/pkg/node/store.go b/p2p/pkg/node/store.go new file mode 100644 index 000000000..e689d556b --- /dev/null +++ b/p2p/pkg/node/store.go @@ -0,0 +1,52 @@ +package node + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + + "github.com/primev/mev-commit/p2p/pkg/storage" +) + +const ( + progressNS = "p2progress/" + progressLastBlockKey = progressNS + "last_block" +) + +type DurableProgressStore struct { + contractRPC ContractRPC + kv storage.Storage +} + +type ContractRPC interface { + BlockNumber(ctx context.Context) (uint64, error) +} + +func NewDurableProgressStore(kv storage.Storage, contractRPC ContractRPC) *DurableProgressStore { + return &DurableProgressStore{ + contractRPC: contractRPC, + kv: kv, + } +} + +func (p *DurableProgressStore) LastBlock() (uint64, error) { + buf, err := p.kv.Get(progressLastBlockKey) + switch { + case err == nil: + if len(buf) != 8 { + return 0, fmt.Errorf("invalid %q length: got %d, want 8", progressLastBlockKey, len(buf)) + } + return binary.BigEndian.Uint64(buf), nil + case errors.Is(err, storage.ErrKeyNotFound): + return p.contractRPC.BlockNumber(context.Background()) + default: + return 0, err + } +} + +func (p *DurableProgressStore) SetLastBlock(block uint64) error { + var b [8]byte + binary.BigEndian.PutUint64(b[:], block) + return p.kv.Put(progressLastBlockKey, b[:]) +} diff --git a/p2p/pkg/node/store_test.go b/p2p/pkg/node/store_test.go new file mode 100644 index 000000000..376c0f7c3 --- /dev/null +++ b/p2p/pkg/node/store_test.go @@ -0,0 +1,63 @@ +package node + +import ( + "context" + "encoding/binary" + "testing" + + inmem "github.com/primev/mev-commit/p2p/pkg/storage/inmem" +) + +type mockContractRPC struct { + blockNumber uint64 +} + +func (m *mockContractRPC) BlockNumber(ctx context.Context) (uint64, error) { + return m.blockNumber, nil +} + +func TestDurableProgressStore_LastBlock_FallbackToRPCWhenUnset(t *testing.T) { + kv := inmem.New() + mockRPC := &mockContractRPC{blockNumber: 12345} + + ps := NewDurableProgressStore(kv, mockRPC) + + got, err := ps.LastBlock() + if err != nil { + t.Fatalf("LastBlock: %v", err) + } + if got != 12345 { + t.Fatalf("LastBlock fallback mismatch: got %d want %d", got, 12345) + } +} + +func TestDurableProgressStore_SetAndGet(t *testing.T) { + kv := inmem.New() + mockRPC := &mockContractRPC{blockNumber: 0} + + ps := NewDurableProgressStore(kv, mockRPC) + + want := uint64(9876543210) + if err := ps.SetLastBlock(want); err != nil { + t.Fatalf("SetLastBlock: %v", err) + } + + got, err := ps.LastBlock() + if err != nil { + t.Fatalf("LastBlock: %v", err) + } + if got != uint64(9876543210) { + t.Fatalf("LastBlock persisted mismatch: got %d want %d", got, want) + } + + raw, err := kv.Get(progressLastBlockKey) + if err != nil { + t.Fatalf("kv.Get: %v", err) + } + if len(raw) != 8 { + t.Fatalf("stored length mismatch: got %d want 8", len(raw)) + } + if binary.BigEndian.Uint64(raw) != uint64(9876543210) { + t.Fatalf("stored value mismatch: got %d want %d", binary.BigEndian.Uint64(raw), want) + } +} diff --git a/p2p/pkg/notifications/notifications.go b/p2p/pkg/notifications/notifications.go index 023be5c16..33ad22445 100644 --- a/p2p/pkg/notifications/notifications.go +++ b/p2p/pkg/notifications/notifications.go @@ -17,6 +17,7 @@ const ( TopicProviderDeregistered Topic = "provider_deregistered" TopicCommitmentStoreFailed Topic = "commitment_store_failed" TopicCommitmentOpenFailed Topic = "commitment_open_failed" + TopicOtherProviderWonBlock Topic = "other_provider_won_block" ) var validTopic = map[Topic]struct{}{ @@ -30,6 +31,7 @@ var validTopic = map[Topic]struct{}{ TopicProviderDeregistered: {}, TopicCommitmentStoreFailed: {}, TopicCommitmentOpenFailed: {}, + TopicOtherProviderWonBlock: {}, } func IsTopicValid(topic Topic) bool { diff --git a/p2p/pkg/preconfirmation/preconfirmation.go b/p2p/pkg/preconfirmation/preconfirmation.go index 85c5de565..d6d1c2b21 100644 --- a/p2p/pkg/preconfirmation/preconfirmation.go +++ b/p2p/pkg/preconfirmation/preconfirmation.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log/slog" + "math/big" "sync" "time" @@ -55,7 +56,6 @@ type DepositManager interface { CheckAndDeductDeposit( ctx context.Context, bidderAddr common.Address, - providerAddr common.Address, bidAmount string, ) (func() error, error) } @@ -278,13 +278,7 @@ func (p *Preconfirmation) handleBid( return err } - opts, err := p.optsGetter(ctx) - if err != nil { - return err - } - providerAddr := opts.From - - tryRefund, err := p.depositMgr.CheckAndDeductDeposit(ctx, *bidderAddr, providerAddr, bid.BidAmount) + tryRefund, err := p.depositMgr.CheckAndDeductDeposit(ctx, *bidderAddr, bid.BidAmount) if err != nil { p.logger.Error("checking deposit", "error", err) return err @@ -352,9 +346,16 @@ func (p *Preconfirmation) handleBid( return status.Errorf(codes.Internal, "failed to store commitments: %v", err) } + bidAmount, ok := new(big.Int).SetString(bid.BidAmount, 10) + if !ok { + return status.Errorf(codes.Internal, "failed to parse bid amount: %v", bid.BidAmount) + } + encryptedAndDecryptedPreconfirmation := &store.Commitment{ EncryptedPreConfirmation: encryptedPreConfirmation, PreConfirmation: preConfirmation, + BidderAddress: bidderAddr, + BidAmount: bidAmount, } if err := p.tracker.TrackCommitment(ctx, encryptedAndDecryptedPreconfirmation, txn); err != nil { diff --git a/p2p/pkg/preconfirmation/preconfirmation_test.go b/p2p/pkg/preconfirmation/preconfirmation_test.go index 5626eccaa..c98c3094f 100644 --- a/p2p/pkg/preconfirmation/preconfirmation_test.go +++ b/p2p/pkg/preconfirmation/preconfirmation_test.go @@ -114,7 +114,6 @@ type testDepositManager struct{} func (t *testDepositManager) CheckAndDeductDeposit( ctx context.Context, bidderAddr common.Address, - providerAddr common.Address, bidAmountStr string, ) (func() error, error) { return func() error { return nil }, nil diff --git a/p2p/pkg/preconfirmation/store/store.go b/p2p/pkg/preconfirmation/store/store.go index 03dd69c54..3107c9edf 100644 --- a/p2p/pkg/preconfirmation/store/store.go +++ b/p2p/pkg/preconfirmation/store/store.go @@ -84,10 +84,12 @@ const ( type Commitment struct { *preconfpb.EncryptedPreConfirmation *preconfpb.PreConfirmation - Status CommitmentStatus - Details string - Payment string - Refund string + Status CommitmentStatus + Details string + Payment string + Refund string + BidderAddress *common.Address + BidAmount *big.Int } type BlockWinner struct { diff --git a/p2p/pkg/preconfirmation/tracker/tracker.go b/p2p/pkg/preconfirmation/tracker/tracker.go index 32fd5bb76..fe8e984c6 100644 --- a/p2p/pkg/preconfirmation/tracker/tracker.go +++ b/p2p/pkg/preconfirmation/tracker/tracker.go @@ -477,6 +477,12 @@ func (t *Tracker) statusUpdater( "txnHash": task.commitment.Bid.TxHash, "error": r.Err.Error(), } + if task.commitment.BidderAddress != nil { + notificationPayload["bidder"] = common.Bytes2Hex(task.commitment.BidderAddress[:]) + } + if task.commitment.BidAmount != nil { + notificationPayload["bidAmount"] = task.commitment.BidAmount.String() + } switch task.onSuccess { case store.CommitmentStatusStored: t.notifier.Notify( @@ -538,6 +544,19 @@ func (t *Tracker) openCommitments( "providerAddress", commitment.ProviderAddress, "winner", newL1Block.Winner, ) + notificationPayload := map[string]any{ + "commitmentDigest": hex.EncodeToString(commitment.Commitment[:]), + } + if commitment.BidderAddress != nil { + notificationPayload["bidder"] = common.Bytes2Hex(commitment.BidderAddress[:]) + } + if commitment.BidAmount != nil { + notificationPayload["bidAmount"] = commitment.BidAmount.String() + } + t.notifier.Notify(notifications.NewNotification( + notifications.TopicOtherProviderWonBlock, + notificationPayload, + )) continue } startTime := time.Now() diff --git a/p2p/pkg/preconfirmation/tracker/tracker_test.go b/p2p/pkg/preconfirmation/tracker/tracker_test.go index 70d001acc..b3cec5391 100644 --- a/p2p/pkg/preconfirmation/tracker/tracker_test.go +++ b/p2p/pkg/preconfirmation/tracker/tracker_test.go @@ -565,6 +565,149 @@ func TestTrackerIgnoreOldBlocks(t *testing.T) { <-doneChan } +func TestOtherProviderWonBlockNotification(t *testing.T) { + t.Parallel() + + pcABI, err := abi.JSON(strings.NewReader(preconf.PreconfmanagerABI)) + if err != nil { + t.Fatal(err) + } + btABI, err := abi.JSON(strings.NewReader(blocktracker.BlocktrackerABI)) + if err != nil { + t.Fatal(err) + } + brABI, err := abi.JSON(strings.NewReader(bidderregistry.BidderregistryABI)) + if err != nil { + t.Fatal(err) + } + orABI, err := abi.JSON(strings.NewReader(oracle.OracleABI)) + if err != nil { + t.Fatal(err) + } + + evtMgr := events.NewListener( + util.NewTestLogger(os.Stdout), + &btABI, + &pcABI, + &brABI, + &orABI, + ) + + st := store.New(inmemstorage.New()) + + contract := &testPreconfContract{ + openedCommitments: make(chan openedCommitment, 1), + } + + watcher := &mockWatcher{} + notifier := &mockNotifier{ + evt: make(chan *notifications.Notification, 1), + } + + sk, pk, err := crypto.GenerateKeyPairBN254() + if err != nil { + t.Fatal(err) + } + + tracker := preconftracker.NewTracker( + big.NewInt(5), + p2p.PeerTypeProvider, + common.HexToAddress("0x1234"), + evtMgr, + st, + contract, + watcher, + notifier, + pk, + sk, + func(context.Context) (*bind.TransactOpts, error) { + return &bind.TransactOpts{ + From: common.HexToAddress("0x1234"), + }, nil + }, + util.NewTestLogger(os.Stdout), + ) + + ctx, cancel := context.WithCancel(context.Background()) + doneChan := tracker.Start(ctx) + defer func() { + cancel() + select { + case <-doneChan: + case <-time.After(2 * time.Second): + } + }() + + winnerProvider := common.HexToAddress("0x1111") + loserProvider := common.HexToAddress("0x2222") + bidderAddr := common.HexToAddress("0x3333") + + digest := common.HexToHash("0xabc") + cmt := &store.Commitment{ + EncryptedPreConfirmation: &preconfpb.EncryptedPreConfirmation{ + Commitment: digest.Bytes(), + Signature: []byte("sig"), + }, + PreConfirmation: &preconfpb.PreConfirmation{ + Bid: &preconfpb.Bid{ + TxHash: common.HexToHash("0xdeadbeef").String(), + BidAmount: "100", + SlashAmount: "0", + BlockNumber: 1, + DecayStartTimestamp: 1, + DecayEndTimestamp: 2, + Digest: digest.Bytes(), + Signature: []byte("bidsig"), + }, + Digest: digest.Bytes(), + Signature: []byte("pcs"), + ProviderAddress: loserProvider.Bytes(), + SharedSecret: []byte("shared"), + }, + BidderAddress: &bidderAddr, + BidAmount: big.NewInt(100), + } + + if err := tracker.TrackCommitment(context.Background(), cmt, nil); err != nil { + t.Fatal(err) + } + + if err := publishUnopenedCommitment(evtMgr, &pcABI, preconf.PreconfmanagerUnopenedCommitmentStored{ + Committer: loserProvider, + CommitmentIndex: common.HexToHash("0x01"), + CommitmentDigest: digest, + CommitmentSignature: cmt.EncryptedPreConfirmation.Signature, + DispatchTimestamp: uint64(1), + }); err != nil { + t.Fatal(err) + } + cmt.CommitmentIndex = common.HexToHash("0x01").Bytes() + + publishNewWinner(evtMgr, &btABI, blocktracker.BlocktrackerNewL1Block{ + BlockNumber: big.NewInt(1), + Winner: winnerProvider, // Different than commitment provider + }) + + select { + case n := <-notifier.evt: + if n.Topic() != notifications.TopicOtherProviderWonBlock { + t.Fatalf("expected topic %s, got %s", notifications.TopicOtherProviderWonBlock, n.Topic()) + } + val := n.Value() + if val == nil { + t.Fatal("expected non-nil notification payload") + } + if got, ok := val["bidder"].(string); !ok || got != common.Bytes2Hex(bidderAddr[:]) { + t.Fatalf("expected bidder %s, got %v", common.Bytes2Hex(bidderAddr[:]), val["bidder"]) + } + if got, ok := val["bidAmount"].(string); !ok || got != "100" { + t.Fatalf("expected bidAmount 100, got %v", val["bidAmount"]) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for TopicOtherProviderWonBlock notification") + } +} + type openedCommitment struct { encryptedCommitmentIndex [32]byte bid *big.Int