Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions p2p/pkg/depositmanager/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry"
"github.com/primev/mev-commit/p2p/pkg/notifications"
"github.com/primev/mev-commit/x/contracts/events"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand All @@ -20,15 +21,16 @@ type BidderRegistryContract interface {
}

type Store interface {
GetBalance(bidder common.Address, provider common.Address) (*big.Int, error)
SetBalance(bidder common.Address, provider common.Address, balance *big.Int) error
DeleteBalance(bidder common.Address, provider common.Address) error
RefundBalanceIfExists(bidder common.Address, provider common.Address, amount *big.Int) error
GetBalance(bidder common.Address) (*big.Int, error)
SetBalance(bidder common.Address, balance *big.Int) error
DeleteBalance(bidder common.Address) error
RefundBalanceIfExists(bidder common.Address, amount *big.Int) error
}

type DepositManager struct {
Comment thread
shaspitz marked this conversation as resolved.
store Store
evtMgr events.EventManager
notifiee notifications.Notifiee
bidderRegistry BidderRegistryContract
deposits chan *bidderregistry.BidderregistryBidderDeposited
withdrawRequests chan *bidderregistry.BidderregistryWithdrawalRequested
Expand All @@ -40,12 +42,14 @@ type DepositManager struct {
func NewDepositManager(
store Store,
evtMgr events.EventManager,
notifiee notifications.Notifiee,
bidderRegistry BidderRegistryContract,
thisProviderAddress common.Address,
logger *slog.Logger,
) *DepositManager {
return &DepositManager{
store: store,
notifiee: notifiee,
bidderRegistry: bidderRegistry,
deposits: make(chan *bidderregistry.BidderregistryBidderDeposited),
withdrawRequests: make(chan *bidderregistry.BidderregistryWithdrawalRequested),
Expand All @@ -61,6 +65,11 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {

eg, egCtx := errgroup.WithContext(ctx)

notifCh := dm.notifiee.Subscribe(
notifications.TopicCommitmentStoreFailed,
notifications.TopicOtherProviderWonBlock,
)

ev1 := events.NewEventHandler(
"BidderDeposited",
func(bidderDeposit *bidderregistry.BidderregistryBidderDeposited) {
Expand Down Expand Up @@ -103,6 +112,11 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
eg.Go(func() error {
defer sub.Unsubscribe()

defer func() {
unsubDone := dm.notifiee.Unsubscribe(notifCh)
<-unsubDone
}()

select {
case <-egCtx.Done():
dm.logger.Info("event subscription context done")
Expand All @@ -119,18 +133,42 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
dm.logger.Info("deposit manager context done")
return nil

case n := <-notifCh:
topic := n.Topic()
if topic != notifications.TopicOtherProviderWonBlock && topic != notifications.TopicCommitmentStoreFailed {
dm.logger.Debug("ignoring notification for topic", "topic", topic)
continue
}

val := n.Value()
bidderHex := val["bidder"].(string)
bidAmount := val["bidAmount"].(string)

bidder := common.HexToAddress(bidderHex)
bidAmountInt, ok := new(big.Int).SetString(bidAmount, 10)
if !ok {
dm.logger.Error("failed to parse bid amount", "bidAmount", bidAmount)
continue
}

if err := dm.store.RefundBalanceIfExists(bidder, bidAmountInt); err != nil {
dm.logger.Error("refunding balance", "error", err)
return err
}
dm.logger.Info("refunded balance from notification", "bidder", bidder, "bidAmount", bidAmountInt)

case deposit := <-dm.deposits:
if deposit.Provider != dm.thisProviderAddress {
dm.logger.Debug("ignoring deposit event for different provider", "provider", deposit.Provider)
continue
}
currentBalance, err := dm.store.GetBalance(deposit.Bidder, deposit.Provider)
currentBalance, err := dm.store.GetBalance(deposit.Bidder)
if err != nil {
dm.logger.Error("getting balance", "error", err)
return err
}
if currentBalance == nil {
if err := dm.store.SetBalance(deposit.Bidder, deposit.Provider, deposit.NewAvailableAmount); err != nil {
if err := dm.store.SetBalance(deposit.Bidder, deposit.NewAvailableAmount); err != nil {
dm.logger.Error("setting balance", "error", err)
return err
}
Expand All @@ -141,7 +179,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
)
} else {
newBalance := new(big.Int).Add(currentBalance, deposit.DepositedAmount)
if err := dm.store.SetBalance(deposit.Bidder, deposit.Provider, newBalance); err != nil {
if err := dm.store.SetBalance(deposit.Bidder, newBalance); err != nil {
dm.logger.Error("setting balance", "error", err)
return err
}
Expand All @@ -156,7 +194,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
dm.logger.Debug("ignoring withdrawal request event for different provider", "provider", withdrawalRequest.Provider)
continue
}
if err := dm.store.DeleteBalance(withdrawalRequest.Bidder, withdrawalRequest.Provider); err != nil {
if err := dm.store.DeleteBalance(withdrawalRequest.Bidder); err != nil {
dm.logger.Error("deleting balance", "error", err)
return err
}
Expand All @@ -170,7 +208,7 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
dm.logger.Debug("ignoring withdrawal event for different provider", "provider", withdrawal.Provider)
continue
}
if err := dm.store.DeleteBalance(withdrawal.Bidder, withdrawal.Provider); err != nil {
if err := dm.store.DeleteBalance(withdrawal.Bidder); err != nil {
dm.logger.Error("deleting balance", "error", err)
return err
}
Expand All @@ -194,7 +232,6 @@ func (dm *DepositManager) Start(ctx context.Context) <-chan struct{} {
func (dm *DepositManager) CheckAndDeductDeposit(
ctx context.Context,
bidderAddr common.Address,
providerAddr common.Address,
bidAmountStr string,
) (func() error, error) {
bidAmount, ok := new(big.Int).SetString(bidAmountStr, 10)
Expand All @@ -203,7 +240,7 @@ func (dm *DepositManager) CheckAndDeductDeposit(
return nil, status.Errorf(codes.InvalidArgument, "failed to parse bid amount")
}

balance, err := dm.store.GetBalance(bidderAddr, providerAddr)
balance, err := dm.store.GetBalance(bidderAddr)
if err != nil {
dm.logger.Error("getting balance", "error", err)
return nil, status.Errorf(codes.Internal, "failed to get balance: %v", err)
Expand All @@ -216,28 +253,28 @@ func (dm *DepositManager) CheckAndDeductDeposit(
return nil, status.Errorf(codes.FailedPrecondition, "insufficient balance")
}

if err := dm.store.SetBalance(bidderAddr, providerAddr, newBalance); err != nil {
if err := dm.store.SetBalance(bidderAddr, newBalance); err != nil {
dm.logger.Error("setting balance", "error", err)
return nil, status.Errorf(codes.Internal, "failed to set balance: %v", err)
}
return func() error {
return dm.store.RefundBalanceIfExists(bidderAddr, providerAddr, bidAmount)
return dm.store.RefundBalanceIfExists(bidderAddr, bidAmount)
}, nil
}
dm.logger.Info("balance not found in store, defaulting to contract call",
"bidder", bidderAddr.Hex(),
"provider", providerAddr.Hex(),
"provider", dm.thisProviderAddress.Hex(),
)

defaultBalance, err := dm.getDefaultBalance(ctx, bidderAddr, providerAddr, nil) // nil for latest block
defaultBalance, err := dm.getDefaultBalance(ctx, bidderAddr, dm.thisProviderAddress, nil) // nil for latest block
if err != nil {
return nil, err
}

if defaultBalance == nil {
dm.logger.Error("bidder balance not found", "bidder", bidderAddr.Hex(), "provider", providerAddr.Hex())
dm.logger.Error("bidder balance not found", "bidder", bidderAddr.Hex(), "provider", dm.thisProviderAddress.Hex())
return nil, status.Errorf(codes.FailedPrecondition,
"balance not found for bidder %s and provider %s", bidderAddr.Hex(), providerAddr.Hex())
"balance not found for bidder %s and provider %s", bidderAddr.Hex(), dm.thisProviderAddress.Hex())
}

if defaultBalance.Cmp(bidAmount) < 0 {
Expand All @@ -246,13 +283,13 @@ func (dm *DepositManager) CheckAndDeductDeposit(
}

newBalance := new(big.Int).Sub(defaultBalance, bidAmount)
if err := dm.store.SetBalance(bidderAddr, providerAddr, newBalance); err != nil {
if err := dm.store.SetBalance(bidderAddr, newBalance); err != nil {
dm.logger.Error("setting balance for block", "error", err)
return nil, status.Errorf(codes.Internal, "failed to set balance for block: %v", err)
}

return func() error {
return dm.store.RefundBalanceIfExists(bidderAddr, providerAddr, bidAmount)
return dm.store.RefundBalanceIfExists(bidderAddr, bidAmount)
}, nil
}

Expand All @@ -276,7 +313,7 @@ func (dm *DepositManager) getDefaultBalance(
}

if balance.Cmp(big.NewInt(0)) > 0 {
if err := dm.store.SetBalance(bidderAddr, providerAddr, balance); err != nil {
if err := dm.store.SetBalance(bidderAddr, balance); err != nil {
dm.logger.Error("setting balance", "error", err)
return nil, status.Errorf(codes.Internal, "failed to set balance: %v", err)
}
Expand Down
16 changes: 4 additions & 12 deletions p2p/pkg/depositmanager/deposit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker"
"github.com/primev/mev-commit/p2p/pkg/depositmanager"
depositstore "github.com/primev/mev-commit/p2p/pkg/depositmanager/store"
"github.com/primev/mev-commit/p2p/pkg/notifications"
inmemstorage "github.com/primev/mev-commit/p2p/pkg/storage/inmem"
"github.com/primev/mev-commit/x/contracts/events"
"github.com/primev/mev-commit/x/util"
Expand Down Expand Up @@ -66,14 +67,13 @@ func TestDepositManager(t *testing.T) {

providerAddress := common.HexToAddress("0x456")

dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger)
dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger)
done := dm.Start(ctx)

// no deposit
refund, err := dm.CheckAndDeductDeposit(
context.Background(),
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
"10",
)
if err == nil {
Expand All @@ -98,7 +98,6 @@ func TestDepositManager(t *testing.T) {
for {
if val, err := st.GetBalance(
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
); err == nil && val != nil && val.Cmp(big.NewInt(100)) == 0 {
break
}
Expand All @@ -109,7 +108,6 @@ func TestDepositManager(t *testing.T) {
refund, err = dm.CheckAndDeductDeposit(
context.Background(),
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
"100",
)
if err != nil {
Expand All @@ -120,7 +118,6 @@ func TestDepositManager(t *testing.T) {
_, err = dm.CheckAndDeductDeposit(
context.Background(),
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
"10",
)
if err == nil || !strings.Contains(err.Error(), "insufficient balance") {
Expand All @@ -136,7 +133,6 @@ func TestDepositManager(t *testing.T) {
_, err = dm.CheckAndDeductDeposit(
context.Background(),
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
"10",
)
if err != nil {
Expand All @@ -145,7 +141,6 @@ func TestDepositManager(t *testing.T) {

balance, err := st.GetBalance(
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
)
if err != nil {
t.Fatal(err)
Expand All @@ -168,7 +163,6 @@ func TestDepositManager(t *testing.T) {
for {
if val, err := st.GetBalance(
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
); err == nil && val == nil {
break
}
Expand Down Expand Up @@ -209,7 +203,6 @@ func TestDepositManager(t *testing.T) {
for {
if val, err := st.GetBalance(
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
); err == nil && val != nil && val.Cmp(big.NewInt(777)) == 0 {
break
}
Expand Down Expand Up @@ -254,7 +247,7 @@ func TestStartWithBidderAlreadyDeposited(t *testing.T) {

providerAddress := common.HexToAddress("0x456")

dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger)
dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger)
done := dm.Start(ctx)

err = publishBidderDeposited(evtMgr, &brABI, &bidderregistry.BidderregistryBidderDeposited{
Expand All @@ -273,7 +266,6 @@ func TestStartWithBidderAlreadyDeposited(t *testing.T) {
for {
if val, err := st.GetBalance(
common.HexToAddress("0x123"),
common.HexToAddress("0x456"),
); err == nil && val != nil && val.Cmp(big.NewInt(133)) == 0 {
break
}
Expand Down Expand Up @@ -316,7 +308,7 @@ func TestOtherProvidersEventsAreIgnored(t *testing.T) {

providerAddress := common.HexToAddress("0x456")

dm := depositmanager.NewDepositManager(st, evtMgr, bidderRegistry, providerAddress, logger)
dm := depositmanager.NewDepositManager(st, evtMgr, notifications.New(10), bidderRegistry, providerAddress, logger)
done := dm.Start(ctx)

differentProvider := common.HexToAddress("0x789")
Expand Down
21 changes: 10 additions & 11 deletions p2p/pkg/depositmanager/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (
)

var (
balanceKey = func(bidder common.Address, provider common.Address) string {
return fmt.Sprintf("%s%s/%s", balanceNS, bidder, provider)
balanceKey = func(bidder common.Address) string {
return fmt.Sprintf("%s%s", balanceNS, bidder)
}
balancePrefix = func(bidder common.Address) string {
return fmt.Sprintf("%s%s", balanceNS, bidder)
Expand All @@ -36,18 +36,18 @@ func New(st storage.Storage) *Store {
}
}

func (s *Store) SetBalance(bidder common.Address, provider common.Address, depositedAmount *big.Int) error {
func (s *Store) SetBalance(bidder common.Address, depositedAmount *big.Int) error {
s.mu.Lock()
defer s.mu.Unlock()

return s.st.Put(balanceKey(bidder, provider), depositedAmount.Bytes())
return s.st.Put(balanceKey(bidder), depositedAmount.Bytes())
}

func (s *Store) GetBalance(bidder common.Address, provider common.Address) (*big.Int, error) {
func (s *Store) GetBalance(bidder common.Address) (*big.Int, error) {
s.mu.RLock()
defer s.mu.RUnlock()

val, err := s.st.Get(balanceKey(bidder, provider))
val, err := s.st.Get(balanceKey(bidder))
switch {
case errors.Is(err, storage.ErrKeyNotFound):
return nil, nil
Expand All @@ -58,21 +58,20 @@ func (s *Store) GetBalance(bidder common.Address, provider common.Address) (*big
return new(big.Int).SetBytes(val), nil
}

func (s *Store) DeleteBalance(bidder common.Address, provider common.Address) error {
func (s *Store) DeleteBalance(bidder common.Address) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.st.Delete(balanceKey(bidder, provider))
return s.st.Delete(balanceKey(bidder))
}

func (s *Store) RefundBalanceIfExists(
bidder common.Address,
provider common.Address,
amount *big.Int,
) error {
s.mu.Lock()
defer s.mu.Unlock()

val, err := s.st.Get(balanceKey(bidder, provider))
val, err := s.st.Get(balanceKey(bidder))
switch {
case errors.Is(err, storage.ErrKeyNotFound):
return status.Errorf(codes.FailedPrecondition, "balance not found, no refund needed")
Expand All @@ -81,7 +80,7 @@ func (s *Store) RefundBalanceIfExists(
}

newAmount := new(big.Int).Add(new(big.Int).SetBytes(val), amount)
return s.st.Put(balanceKey(bidder, provider), newAmount.Bytes())
return s.st.Put(balanceKey(bidder), newAmount.Bytes())
}

func (s *Store) BalanceEntries(bidder common.Address) (int, error) {
Expand Down
Loading