diff --git a/p2p/pkg/node/node.go b/p2p/pkg/node/node.go index af912af16..3a058e75f 100644 --- a/p2p/pkg/node/node.go +++ b/p2p/pkg/node/node.go @@ -496,6 +496,7 @@ func NewNode(opts *Options) (*Node, error) { }, ) srv.RegisterMetricsCollectors(tracker.Metrics()...) + preconfStore.SetNotifier(notificationsSvc) l1ContractRPC, err := ethclient.Dial(opts.L1RPCURL) if err != nil { diff --git a/p2p/pkg/notifications/notifications.go b/p2p/pkg/notifications/notifications.go index 33ad22445..926181888 100644 --- a/p2p/pkg/notifications/notifications.go +++ b/p2p/pkg/notifications/notifications.go @@ -18,6 +18,8 @@ const ( TopicCommitmentStoreFailed Topic = "commitment_store_failed" TopicCommitmentOpenFailed Topic = "commitment_open_failed" TopicOtherProviderWonBlock Topic = "other_provider_won_block" + TopicTransactionSettled Topic = "transaction_settled" + TopicTransactionPayment Topic = "transaction_payment" ) var validTopic = map[Topic]struct{}{ @@ -32,6 +34,8 @@ var validTopic = map[Topic]struct{}{ TopicCommitmentStoreFailed: {}, TopicCommitmentOpenFailed: {}, TopicOtherProviderWonBlock: {}, + TopicTransactionSettled: {}, + TopicTransactionPayment: {}, } func IsTopicValid(topic Topic) bool { diff --git a/p2p/pkg/preconfirmation/store/store.go b/p2p/pkg/preconfirmation/store/store.go index 3107c9edf..6e1a18044 100644 --- a/p2p/pkg/preconfirmation/store/store.go +++ b/p2p/pkg/preconfirmation/store/store.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" preconfpb "github.com/primev/mev-commit/p2p/gen/go/preconfirmation/v1" + "github.com/primev/mev-commit/p2p/pkg/notifications" "github.com/primev/mev-commit/p2p/pkg/storage" "github.com/vmihailenco/msgpack/v5" ) @@ -68,6 +69,7 @@ var ( type Store struct { mu sync.RWMutex st storage.Storage + n notifications.Notifier } type CommitmentStatus string @@ -108,6 +110,10 @@ func New(st storage.Storage) *Store { } } +func (s *Store) SetNotifier(n notifications.Notifier) { + s.n = n +} + func (s *Store) AddCommitment(commitment *Commitment) (err error) { s.mu.Lock() defer s.mu.Unlock() @@ -256,7 +262,10 @@ func (s *Store) ListCommitments(opts *ListOpts) ([]*Commitment, error) { } func (s *Store) SetCommitmentIndexByDigest(cDigest, cIndex [32]byte) (retErr error) { - cmt, err := s.GetCommitmentByDigest(cDigest[:]) + s.mu.Lock() + defer s.mu.Unlock() + + cmt, err := s.getCommitmentByDigest(cDigest[:]) if err != nil { if errors.Is(err, storage.ErrKeyNotFound) { return nil @@ -275,9 +284,6 @@ func (s *Store) SetCommitmentIndexByDigest(cDigest, cIndex [32]byte) (retErr err commitmentKey := commitmentKey(cmt.Bid.BlockNumber, cmt.Bid.BidAmount, cmt.Commitment) - s.mu.Lock() - defer s.mu.Unlock() - var writer storage.Writer if w, ok := s.st.(storage.Batcher); ok { batch := w.Batch() @@ -325,6 +331,18 @@ func (s *Store) UpdateSettlement(index []byte, isSlash bool) error { return err } + defer func() { + if s.n != nil { + s.n.Notify( + notifications.NewNotification(notifications.TopicTransactionSettled, map[string]any{ + "transaction_hashes": cmt.Bid.TxHash, + "is_slashed": isSlash, + "provider": common.Bytes2Hex(cmt.ProviderAddress), + }), + ) + } + }() + if isSlash { cmt.Status = CommitmentStatusSlashed } else { @@ -340,7 +358,10 @@ func (s *Store) UpdateSettlement(index []byte, isSlash bool) error { } func (s *Store) UpdatePayment(digest []byte, payment, refund string) error { - cmt, err := s.GetCommitmentByDigest(digest) + s.mu.Lock() + defer s.mu.Unlock() + + cmt, err := s.getCommitmentByDigest(digest) if err != nil { if errors.Is(err, storage.ErrKeyNotFound) { return nil @@ -348,8 +369,17 @@ func (s *Store) UpdatePayment(digest []byte, payment, refund string) error { return err } - s.mu.Lock() - defer s.mu.Unlock() + defer func() { + if s.n != nil { + s.n.Notify( + notifications.NewNotification(notifications.TopicTransactionPayment, map[string]any{ + "transaction_hashes": cmt.Bid.TxHash, + "payment": payment, + "refund": refund, + }), + ) + } + }() cmt.Payment = payment cmt.Refund = refund @@ -368,6 +398,10 @@ func (s *Store) GetCommitmentByDigest(digest []byte) (*Commitment, error) { s.mu.RLock() defer s.mu.RUnlock() + return s.getCommitmentByDigest(digest) +} + +func (s *Store) getCommitmentByDigest(digest []byte) (*Commitment, error) { cIndexValueBuf, err := s.st.Get(cmtIndexKey(digest)) if err != nil { return nil, err