From f88618e4f3e80df1471ed93a6fc6a70b4add4380 Mon Sep 17 00:00:00 2001 From: Shawn <44221603+shaspitz@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:52:53 -0700 Subject: [PATCH] fix: ws publisher and its flaky test --- x/contracts/events/publisher/ws.go | 24 ++++++++++++++++-------- x/contracts/events/publisher/ws_test.go | 3 +-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/x/contracts/events/publisher/ws.go b/x/contracts/events/publisher/ws.go index ebea660f6..5ee0c9ff0 100644 --- a/x/contracts/events/publisher/ws.go +++ b/x/contracts/events/publisher/ws.go @@ -44,7 +44,18 @@ func NewWSPublisher(progressStore ProgressStore, logger *slog.Logger, evmClient } func (w *wsPublisher) AddContracts(addr ...common.Address) { + added := w.addContracts(addr...) + if added { + select { + case w.updateCh <- struct{}{}: + default: + } + } +} + +func (w *wsPublisher) addContracts(addr ...common.Address) bool { w.mu.Lock() + defer w.mu.Unlock() added := false for _, a := range addr { if !slices.Contains(w.contracts, a) { @@ -53,13 +64,7 @@ func (w *wsPublisher) AddContracts(addr ...common.Address) { w.logger.Info("ws: added contract address", "address", a.Hex()) } } - w.mu.Unlock() - if added { - select { - case w.updateCh <- struct{}{}: - default: - } - } + return added } func (w *wsPublisher) getContracts() []common.Address { @@ -72,7 +77,10 @@ func (w *wsPublisher) getContracts() []common.Address { func (w *wsPublisher) Start(ctx context.Context, contractAddr ...common.Address) <-chan struct{} { doneChan := make(chan struct{}) - w.AddContracts(contractAddr...) + added := w.addContracts(contractAddr...) + if !added { + w.logger.Warn("contracts were added before starting the publisher") + } if len(contractAddr) == 0 { w.logger.Info("ws: starting with no contracts; waiting for addresses to be added") diff --git a/x/contracts/events/publisher/ws_test.go b/x/contracts/events/publisher/ws_test.go index 817da04ec..305e6adc9 100644 --- a/x/contracts/events/publisher/ws_test.go +++ b/x/contracts/events/publisher/ws_test.go @@ -54,8 +54,7 @@ func TestWSPublisher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - doneChan := p.Start(ctx) - p.AddContracts(common.Address{}) + doneChan := p.Start(ctx, common.Address{}) // Wait for first subscribe (will immediately error and cause resubscribe) select {