From d45a7874f17518a8911bea20c435bb0bba2db592 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:00:01 -0700 Subject: [PATCH 1/7] add dm state to dashboard --- tools/dashboard/main.go | 30 +++++++ tools/dashboard/stathandler.go | 135 ++++++++++++++++++++++++++++- x/contracts/events/publisher/ws.go | 72 ++++++++++++--- 3 files changed, 220 insertions(+), 17 deletions(-) diff --git a/tools/dashboard/main.go b/tools/dashboard/main.go index 93b675c99..a256303b4 100644 --- a/tools/dashboard/main.go +++ b/tools/dashboard/main.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry" blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" + depositmanager "github.com/primev/mev-commit/contracts-abi/clients/DepositManager" oracle "github.com/primev/mev-commit/contracts-abi/clients/Oracle" preconf "github.com/primev/mev-commit/contracts-abi/clients/PreconfManager" providerregistry "github.com/primev/mev-commit/contracts-abi/clients/ProviderRegistry" @@ -192,6 +193,20 @@ func main() { evtMgr, ) + dynSub, err := evtMgr.Subscribe( + events.NewEventHandler( + "BidderDeposited", + func(upd *bidderregistry.BidderregistryBidderDeposited) { + // Register deposited bidder as a contract in case it has enabled deposit manager + pb.AddContracts(upd.Bidder) + }, + ), + ) + if err != nil { + return err + } + defer dynSub.Unsubscribe() + statHdlr, err := newStatHandler(evtMgr) if err != nil { return err @@ -388,6 +403,15 @@ func registerRoutes(mux *http.ServeMux, statHdlr *statHandler) { return } }) + + mux.HandleFunc("GET /dmcounts", func(w http.ResponseWriter, r *http.Request) { + dout := struct { + DMCounts []*DepositManagerEventCounts `json:"deposit_manager_counts"` + }{ + DMCounts: statHdlr.getDMCounts(), + } + _ = json.NewEncoder(w).Encode(dout) + }) } func parsePagination(r *http.Request) (int, int) { @@ -437,11 +461,17 @@ func getContractABIs() ([]*abi.ABI, error) { return nil, err } + dmABI, err := abi.JSON(strings.NewReader(depositmanager.DepositmanagerABI)) + if err != nil { + return nil, err + } + return []*abi.ABI{ &btABI, &pcABI, &bidderRegistry, &providerRegistry, &orABI, + &dmABI, }, nil } diff --git a/tools/dashboard/stathandler.go b/tools/dashboard/stathandler.go index 5e3e7a112..bc9ba52a5 100644 --- a/tools/dashboard/stathandler.go +++ b/tools/dashboard/stathandler.go @@ -8,6 +8,7 @@ import ( lru "github.com/hashicorp/golang-lru/v2" bidderregistry "github.com/primev/mev-commit/contracts-abi/clients/BidderRegistry" blocktracker "github.com/primev/mev-commit/contracts-abi/clients/BlockTracker" + depositmanager "github.com/primev/mev-commit/contracts-abi/clients/DepositManager" oracle "github.com/primev/mev-commit/contracts-abi/clients/Oracle" preconf "github.com/primev/mev-commit/contracts-abi/clients/PreconfManager" providerregistry "github.com/primev/mev-commit/contracts-abi/clients/ProviderRegistry" @@ -26,6 +27,7 @@ type statHandler struct { totalOpenedCommitments uint64 totalRewards uint64 totalSlashes uint64 + dmEventCounts *lru.Cache[string, *DepositManagerEventCounts] evtMgr events.EventManager sub events.Subscription unsub func() @@ -74,11 +76,23 @@ type AggregateStats struct { TotalSlashes uint64 `json:"total_slashes"` } +type DepositManagerEventCounts struct { + Bidder string `json:"bidder"` + DepositToppedUp uint64 `json:"deposit_topped_up"` + TopUpReduced uint64 `json:"top_up_reduced"` + CurrentDepositIsSufficient uint64 `json:"current_deposit_is_sufficient"` + CurrentBalanceAtOrBelowMin uint64 `json:"current_balance_at_or_below_min"` + TargetDepositDoesNotExist uint64 `json:"target_deposit_does_not_exist"` + WithdrawalRequestExists uint64 `json:"withdrawal_request_exists"` + TargetDepositSet uint64 `json:"target_deposit_set"` +} + type DashboardOut struct { - Aggregate *AggregateStats `json:"aggregate"` - Providers []*ProviderBalances `json:"providers"` - Blocks []*BlockStats `json:"blocks"` - Bidders []*BidderDeposit `json:"bidders"` + Aggregate *AggregateStats `json:"aggregate"` + Providers []*ProviderBalances `json:"providers"` + Blocks []*BlockStats `json:"blocks"` + Bidders []*BidderDeposit `json:"bidders"` + DMCounts []*DepositManagerEventCounts `json:"deposit_manager_counts"` } func newStatHandler(evtMgr events.EventManager) (*statHandler, error) { @@ -107,12 +121,18 @@ func newStatHandler(evtMgr events.EventManager) (*statHandler, error) { return nil, err } + dmEventCounts, err := lru.New[string, *DepositManagerEventCounts](10000) + if err != nil { + return nil, err + } + st := &statHandler{ blockStats: blockStats, providerStakes: providerStakes, bidderDeposits: bidderDeposits, commitments: commitments, commitmentsByBlock: commitmentsByBlock, + dmEventCounts: dmEventCounts, evtMgr: evtMgr, } @@ -435,6 +455,105 @@ func (s *statHandler) configureDashboard() error { }, existing) }, ), + events.NewEventHandler( + "DepositToppedUp", + func(upd *depositmanager.DepositmanagerDepositToppedUp) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.DepositToppedUp++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + events.NewEventHandler( + "TopUpReduced", + func(upd *depositmanager.DepositmanagerTopUpReduced) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.TopUpReduced++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + events.NewEventHandler( + "CurrentBalanceAtOrBelowMin", + func(upd *depositmanager.DepositmanagerCurrentBalanceAtOrBelowMin) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.CurrentBalanceAtOrBelowMin++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + events.NewEventHandler( + "CurrentDepositIsSufficient", + func(upd *depositmanager.DepositmanagerCurrentDepositIsSufficient) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.CurrentDepositIsSufficient++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + events.NewEventHandler( + "TargetDepositDoesNotExist", + func(upd *depositmanager.DepositmanagerTargetDepositDoesNotExist) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.TargetDepositDoesNotExist++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + + events.NewEventHandler( + "WithdrawalRequestExists", + func(upd *depositmanager.DepositmanagerWithdrawalRequestExists) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.WithdrawalRequestExists++ + _ = s.dmEventCounts.Add(b, c) + }, + ), + events.NewEventHandler( + "TargetDepositSet", + func(upd *depositmanager.DepositmanagerTargetDepositSet) { + s.statMu.Lock() + defer s.statMu.Unlock() + b := upd.Raw.Address.Hex() + c, ok := s.dmEventCounts.Get(b) + if !ok { + c = &DepositManagerEventCounts{Bidder: b} + } + c.TargetDepositSet++ + _ = s.dmEventCounts.Add(b, c) + }, + ), } sub, err := s.evtMgr.Subscribe(handlers...) @@ -474,12 +593,14 @@ func (s *statHandler) getDashboard(page, limit int) *DashboardOut { providers := s.getProviders() blocks := s.getBlocks(page, limit) bidders := s.getBidders() + dmCounts := s.getDMCounts() return &DashboardOut{ Aggregate: agg, Providers: providers, Blocks: blocks, Bidders: bidders, + DMCounts: dmCounts, } } @@ -502,6 +623,12 @@ func (s *statHandler) getBidders() []*BidderDeposit { return all } +func (s *statHandler) getDMCounts() []*DepositManagerEventCounts { + s.statMu.RLock() + defer s.statMu.RUnlock() + return s.dmEventCounts.Values() +} + func (s *statHandler) getBlockStats(block uint64) *BlockStats { s.statMu.RLock() defer s.statMu.RUnlock() diff --git a/x/contracts/events/publisher/ws.go b/x/contracts/events/publisher/ws.go index 126f8ef86..802b65e37 100644 --- a/x/contracts/events/publisher/ws.go +++ b/x/contracts/events/publisher/ws.go @@ -4,6 +4,8 @@ import ( "context" "log/slog" "math/big" + "slices" + "sync" "time" "github.com/ethereum/go-ethereum" @@ -24,6 +26,10 @@ type wsPublisher struct { logger *slog.Logger evmClient WSEVMClient subscriber Subscriber + + mu sync.RWMutex + contracts []common.Address + updateCh chan struct{} } func NewWSPublisher(progressStore ProgressStore, logger *slog.Logger, evmClient WSEVMClient, subscriber Subscriber) *wsPublisher { @@ -32,16 +38,44 @@ func NewWSPublisher(progressStore ProgressStore, logger *slog.Logger, evmClient logger: logger, evmClient: evmClient, subscriber: subscriber, + contracts: make([]common.Address, 0), + updateCh: make(chan struct{}, 1), + } +} + +func (w *wsPublisher) AddContracts(addr ...common.Address) { + w.mu.Lock() + added := false + for _, a := range addr { + if !slices.Contains(w.contracts, a) { + w.contracts = append(w.contracts, a) + added = true + w.logger.Info("ws: added contract address", "address", a.Hex()) + } + } + w.mu.Unlock() + if added { + select { + case w.updateCh <- struct{}{}: + default: + } } } -func (w *wsPublisher) Start(ctx context.Context, contracts ...common.Address) <-chan struct{} { +func (w *wsPublisher) getContracts() []common.Address { + w.mu.RLock() + defer w.mu.RUnlock() + cp := make([]common.Address, len(w.contracts)) + copy(cp, w.contracts) + return cp +} + +func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) <-chan struct{} { doneChan := make(chan struct{}) + w.AddContracts(contractAddr...) - if len(contracts) == 0 { - w.logger.Error("no contracts to listen to") - close(doneChan) - return doneChan + if len(contractAddr) == 0 { + w.logger.Info("ws: starting with no contracts; waiting for addresses to be added") } go func() { @@ -60,19 +94,29 @@ func (w *wsPublisher) Start(ctx context.Context, contracts ...common.Address) <- return } + addresses := w.getContracts() + if len(addresses) == 0 { + select { + case <-ctx.Done(): + return + case <-w.updateCh: + continue + case <-time.After(500 * time.Millisecond): + continue + } + } + q := ethereum.FilterQuery{ FromBlock: big.NewInt(int64(lastBlock + 1)), ToBlock: nil, - Addresses: contracts, + Addresses: addresses, } logChan := make(chan types.Log) - sub, err := w.evmClient.SubscribeFilterLogs(ctx, q, logChan) if err != nil { - // retry after 5 seconds w.logger.Warn("failed to subscribe to logs", "error", err) - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) continue } @@ -82,16 +126,18 @@ func (w *wsPublisher) Start(ctx context.Context, contracts ...common.Address) <- case <-ctx.Done(): sub.Unsubscribe() return + case <-w.updateCh: + w.logger.Info("ws: contract set updated; resubscribing") + inactivityStart = time.Now() + sub.Unsubscribe() + break PROCESSING case err := <-sub.Err(): - // retry after 5 seconds w.logger.Warn("subscription error", "error", err) inactivityStart = time.Now() - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) break PROCESSING case logMsg := <-logChan: - // process log w.subscriber.PublishLogEvent(ctx, logMsg) - if logMsg.BlockNumber > lastBlock { if err := w.progressStore.SetLastBlock(logMsg.BlockNumber); err != nil { w.logger.Error("failed to set last block", "error", err) From 60b499da749e50083fea20bdf1e68fc3dc3c052b Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 19:20:24 -0700 Subject: [PATCH 2/7] Update stathandler.go --- tools/dashboard/stathandler.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/tools/dashboard/stathandler.go b/tools/dashboard/stathandler.go index bc9ba52a5..06eea3683 100644 --- a/tools/dashboard/stathandler.go +++ b/tools/dashboard/stathandler.go @@ -55,13 +55,14 @@ type ProviderBalances struct { type BidderDeposit struct { Bidder string `json:"bidder"` Provider string `json:"provider"` - Amount string `json:"amount"` + AvailableAmount string `json:"available_amount"` Refunds string `json:"refunds"` Settled string `json:"settled"` Withdrawn string `json:"withdrawn"` OpenCommitmentsCount uint64 `json:"open_commitments_count"` ReturnsCount uint64 `json:"returns_count"` SettledCount uint64 `json:"settled_count"` + DepositedCount uint64 `json:"deposited_count"` } type depositKey struct { @@ -379,17 +380,23 @@ func (s *statHandler) configureDashboard() error { existing = make([]*BidderDeposit, 0) } + updated := false for _, b := range existing { if b.Bidder == upd.Bidder.Hex() { - return + b.AvailableAmount = upd.NewAvailableAmount.String() + b.DepositedCount++ + updated = true + break } } - - existing = append(existing, &BidderDeposit{ - Bidder: upd.Bidder.Hex(), - Provider: upd.Provider.Hex(), - Amount: upd.DepositedAmount.String(), - }) + if !updated { + existing = append(existing, &BidderDeposit{ + Bidder: upd.Bidder.Hex(), + Provider: upd.Provider.Hex(), + AvailableAmount: upd.NewAvailableAmount.String(), + DepositedCount: 1, + }) + } _ = s.bidderDeposits.Add(depositKey{ bidder: upd.Bidder.Hex(), provider: upd.Provider.Hex(), @@ -445,6 +452,7 @@ func (s *statHandler) configureDashboard() error { for idx, b := range existing { if b.Bidder == upd.Bidder.Hex() { existing[idx].Withdrawn = upd.AmountWithdrawn.String() + existing[idx].AvailableAmount = "0" break } } From 4d3ae64de4d337bd4c2d78110fb037e68bd0f5b3 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 19:26:06 -0700 Subject: [PATCH 3/7] Update ws.go --- x/contracts/events/publisher/ws.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x/contracts/events/publisher/ws.go b/x/contracts/events/publisher/ws.go index 802b65e37..6c19f753a 100644 --- a/x/contracts/events/publisher/ws.go +++ b/x/contracts/events/publisher/ws.go @@ -116,7 +116,7 @@ func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) sub, err := w.evmClient.SubscribeFilterLogs(ctx, q, logChan) if err != nil { w.logger.Warn("failed to subscribe to logs", "error", err) - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) continue } @@ -132,11 +132,13 @@ func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) sub.Unsubscribe() break PROCESSING case err := <-sub.Err(): + // retry after 5 seconds w.logger.Warn("subscription error", "error", err) inactivityStart = time.Now() - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) break PROCESSING case logMsg := <-logChan: + // process log w.subscriber.PublishLogEvent(ctx, logMsg) if logMsg.BlockNumber > lastBlock { if err := w.progressStore.SetLastBlock(logMsg.BlockNumber); err != nil { From ce5a3ca82fe542a7d950b29bfee90b4d54aca98b Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 20:10:31 -0700 Subject: [PATCH 4/7] fix TestWSPublisher --- x/contracts/events/publisher/ws_test.go | 86 +++++++++++++++++-------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/x/contracts/events/publisher/ws_test.go b/x/contracts/events/publisher/ws_test.go index a3d57cddf..9c1da0688 100644 --- a/x/contracts/events/publisher/ws_test.go +++ b/x/contracts/events/publisher/ws_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "sync" "testing" "time" @@ -35,72 +36,80 @@ func TestWSPublisher(t *testing.T) { }, } + // First subscription should error, second should run errC := make(chan error, 1) errC <- errors.New("test error") + evmClient := &testWSEVMClient{ - subscribed: make(chan struct{}), - sub: &testSubscription{ - done: make(chan struct{}), - errC: errC, - }, + subscribed: make(chan struct{}, 3), + errC: errC, } progressStore := &testStore{} subscriber := &testSubscriber{ logs: make(chan types.Log), } - publisher := publisher.NewWSPublisher(progressStore, logger, evmClient, subscriber) - noContractsDone := publisher.Start(context.Background()) - select { - case <-noContractsDone: - case <-time.After(1 * time.Second): - t.Error("timed out waiting for doneChan") - } + p := publisher.NewWSPublisher(progressStore, logger, evmClient, subscriber) ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - doneChan := publisher.Start(ctx, common.Address{}) + doneChan := p.Start(ctx) + p.AddContracts(common.Address{}) - // first one will return error and restart - <-evmClient.subscribed - // second one will return logs - <-evmClient.subscribed + // Wait for first subscribe (will immediately error and cause resubscribe) + select { + case <-evmClient.subscribed: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for first subscribe") + } + // Wait for second subscribe (active) + select { + case <-evmClient.subscribed: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for second subscribe") + } - evmClient.logs <- logs[0] + // Send two logs and expect them to be forwarded + evmClient.SendLog(logs[0]) select { case log := <-subscriber.logs: if diff := cmp.Diff(log, logs[0]); diff != "" { t.Errorf("unexpected log (-got +want):\n%s", diff) } case <-time.After(1 * time.Second): - t.Error("timed out waiting for log") + t.Fatal("timed out waiting for first log") } - evmClient.logs <- logs[1] + evmClient.SendLog(logs[1]) select { case log := <-subscriber.logs: if diff := cmp.Diff(log, logs[1]); diff != "" { t.Errorf("unexpected log (-got +want):\n%s", diff) } case <-time.After(1 * time.Second): - t.Error("timed out waiting for log") + t.Fatal("timed out waiting for second log") } cancel() select { case <-doneChan: case <-time.After(1 * time.Second): - t.Error("timed out waiting for doneChan") + t.Fatal("timed out waiting for doneChan") } + // Ensure current subscription was unsubscribed + evmClient.mu.Lock() + sub := evmClient.sub + evmClient.mu.Unlock() select { - case <-evmClient.sub.done: + case <-sub.done: case <-time.After(1 * time.Second): - t.Error("timed out waiting for subscription to be unsubscribed") + t.Fatal("timed out waiting for subscription to be unsubscribed") } - if progressStore.blockNumber != 2 { - t.Errorf("expected block number 2, got %d", progressStore.blockNumber) + if bn, _ := progressStore.LastBlock(); bn != 2 { + t.Errorf("expected block number 2, got %d", bn) } } @@ -118,13 +127,36 @@ func (s *testSubscription) Err() <-chan error { } type testWSEVMClient struct { + mu sync.Mutex subscribed chan struct{} logs chan<- types.Log sub *testSubscription + errC chan error } func (c *testWSEVMClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) { - defer func() { c.subscribed <- struct{}{} }() + c.mu.Lock() c.logs = logs + var errCh chan error + if c.errC != nil { + errCh = c.errC + c.errC = nil + } else { + errCh = make(chan error) + } + c.sub = &testSubscription{ + done: make(chan struct{}), + errC: errCh, + } + c.mu.Unlock() + + c.subscribed <- struct{}{} return c.sub, nil } + +func (c *testWSEVMClient) SendLog(l types.Log) { + c.mu.Lock() + ch := c.logs + c.mu.Unlock() + ch <- l +} From dcd7ff7ea42a58c84d28dccc04c9cf17aaae4da5 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 20:16:30 -0700 Subject: [PATCH 5/7] Add TestWSPublisher_AddContracts --- x/contracts/events/publisher/ws_test.go | 96 +++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/x/contracts/events/publisher/ws_test.go b/x/contracts/events/publisher/ws_test.go index 9c1da0688..607d4d369 100644 --- a/x/contracts/events/publisher/ws_test.go +++ b/x/contracts/events/publisher/ws_test.go @@ -113,6 +113,92 @@ func TestWSPublisher(t *testing.T) { } } +func TestWSPublisher_AddContracts(t *testing.T) { + t.Parallel() + + logger := util.NewTestLogger(io.Discard) + + evmClient := &testWSEVMClient{ + subscribed: make(chan struct{}, 10), + } + progressStore := &testStore{} + subscriber := &testSubscriber{ + logs: make(chan types.Log), + } + + p := publisher.NewWSPublisher(progressStore, logger, evmClient, subscriber) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + doneChan := p.Start(ctx) + + // No contracts yet, no subscribe + select { + case <-evmClient.subscribed: + t.Fatal("unexpected subscribe before adding contracts") + case <-time.After(150 * time.Millisecond): + } + + addr1 := common.HexToAddress("0x1") + addr2 := common.HexToAddress("0x2") + + // Add first address, expect subscribe + p.AddContracts(addr1) + select { + case <-evmClient.subscribed: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for initial subscribe after AddContracts(addr1)") + } + + // Add duplicate, no resubscribe + p.AddContracts(addr1) + select { + case <-evmClient.subscribed: + t.Fatal("unexpected resubscribe on duplicate address") + case <-time.After(250 * time.Millisecond): + } + + // Add second address, expect resubscribe and both addrs present + p.AddContracts(addr2) + select { + case <-evmClient.subscribed: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for resubscribe after AddContracts(addr2)") + } + + addrs := evmClient.LastAddrs() + if len(addrs) != 2 { + t.Fatalf("expected 2 addresses, got %d: %v", len(addrs), addrs) + } + seen := map[string]bool{} + for _, a := range addrs { + seen[a.Hex()] = true + } + if !seen[addr1.Hex()] || !seen[addr2.Hex()] { + t.Fatalf("expected both addresses present; got %v", addrs) + } + + cancel() + select { + case <-doneChan: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for doneChan") + } + + // Ensure current subscription was unsubscribed + sub := func() *testSubscription { + evmClient.mu.Lock() + defer evmClient.mu.Unlock() + return evmClient.sub + }() + select { + case <-sub.done: + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for subscription to be unsubscribed") + } +} + type testSubscription struct { done chan struct{} errC chan error @@ -132,11 +218,13 @@ type testWSEVMClient struct { logs chan<- types.Log sub *testSubscription errC chan error + lastAddrs []common.Address } func (c *testWSEVMClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) { c.mu.Lock() c.logs = logs + c.lastAddrs = append([]common.Address(nil), q.Addresses...) var errCh chan error if c.errC != nil { errCh = c.errC @@ -160,3 +248,11 @@ func (c *testWSEVMClient) SendLog(l types.Log) { c.mu.Unlock() ch <- l } + +func (c *testWSEVMClient) LastAddrs() []common.Address { + c.mu.Lock() + defer c.mu.Unlock() + cp := make([]common.Address, len(c.lastAddrs)) + copy(cp, c.lastAddrs) + return cp +} From 1464750426113020da70558144a40b3235fd6503 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 20:18:09 -0700 Subject: [PATCH 6/7] Update ws.go --- x/contracts/events/publisher/ws.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x/contracts/events/publisher/ws.go b/x/contracts/events/publisher/ws.go index 6c19f753a..ebea660f6 100644 --- a/x/contracts/events/publisher/ws.go +++ b/x/contracts/events/publisher/ws.go @@ -113,8 +113,10 @@ func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) } logChan := make(chan types.Log) + sub, err := w.evmClient.SubscribeFilterLogs(ctx, q, logChan) if err != nil { + // retry after 5 seconds w.logger.Warn("failed to subscribe to logs", "error", err) time.Sleep(5 * time.Second) continue @@ -140,6 +142,7 @@ func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) case logMsg := <-logChan: // process log w.subscriber.PublishLogEvent(ctx, logMsg) + if logMsg.BlockNumber > lastBlock { if err := w.progressStore.SetLastBlock(logMsg.BlockNumber); err != nil { w.logger.Error("failed to set last block", "error", err) From 432e9b6dcf9a575937fbb342b3b09adc5d6e64a5 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Wed, 3 Sep 2025 20:32:10 -0700 Subject: [PATCH 7/7] Update ws_test.go --- x/contracts/events/publisher/ws_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/contracts/events/publisher/ws_test.go b/x/contracts/events/publisher/ws_test.go index 607d4d369..817da04ec 100644 --- a/x/contracts/events/publisher/ws_test.go +++ b/x/contracts/events/publisher/ws_test.go @@ -66,7 +66,7 @@ func TestWSPublisher(t *testing.T) { // Wait for second subscribe (active) select { case <-evmClient.subscribed: - case <-time.After(5 * time.Second): + case <-time.After(7 * time.Second): t.Fatal("timed out waiting for second subscribe") }