From bf94b422dd000f2a0ad33383d6746c0b76c90c74 Mon Sep 17 00:00:00 2001 From: syntrust Date: Fri, 9 Jan 2026 12:11:17 +0800 Subject: [PATCH 1/7] fast polling --- ethstorage/node/node.go | 44 ++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index 180ba628..bcae2d6e 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -147,7 +147,7 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error { } func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { - client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, cfg.L1.L1BlockTime, n.lg) + client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, 1, n.lg) if err != nil { return fmt.Errorf("failed to create L1 source: %w", err) } @@ -166,7 +166,7 @@ func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { return fmt.Errorf("no L1 beacon or DA URL provided") } if cfg.RandaoSourceURL != "" { - rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, cfg.L1.L1BlockTime, n.lg) + rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, 1, n.lg) if err != nil { return fmt.Errorf("failed to create randao source: %w", err) } @@ -191,26 +191,26 @@ func (n *EsNode) startL1(cfg *Config) { } n.lg.Error("L1 heads subscription error", "err", err) }() - - // Keep subscribed to the randao heads, which helps miner to get proper random seeds - n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { - if err != nil { - n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) - } - if n.randaoSource != nil { - return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) - } else { - return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) - } - }) - go func() { - err, ok := <-n.randaoHeadsSub.Err() - if !ok { - return - } - n.lg.Error("Randao heads subscription error", "err", err) - }() - + if n.miner != nil { + // Keep subscribed to the randao heads, which helps miner to get proper random seeds + n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) + } + if n.randaoSource != nil { + return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) + } else { + return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) + } + }) + go func() { + err, ok := <-n.randaoHeadsSub.Err() + if !ok { + return + } + n.lg.Error("Randao heads subscription error", "err", err) + }() + } // Poll for the safe L1 block and finalized block, // which only change once per epoch at most and may be delayed. n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.lg, n.l1Source, n.OnNewL1Safe, ethRPC.SafeBlockNumber, From c6315f8336da39be443a7c0c6210bd0363bce028 Mon Sep 17 00:00:00 2001 From: syntrust Date: Fri, 16 Jan 2026 17:51:16 +0800 Subject: [PATCH 2/7] better polling --- ethstorage/eth/polling_client.go | 72 ++++++++++++++++++++++++-------- ethstorage/node/node.go | 4 +- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 0d1591b0..f58c2b41 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -147,45 +147,43 @@ func (w *PollingClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H } func (w *PollingClient) pollHeads() { - // To prevent polls from stacking up in case HTTP requests - // are slow, use a similar model to the driver in which - // polls are requested manually after each header is fetched. - reqPollAfter := func() { - if w.pollRate == 0 { - return - } - time.AfterFunc(w.pollRate, w.reqPoll) - } - reqPollAfter() + w.reqPoll() defer close(w.closedCh) for { select { case <-w.pollReqCh: - // We don't need backoff here because we'll just try again - // after the pollRate elapses. + pollStart := time.Now() head, err := w.queryHeader() + queryDur := time.Since(pollStart) + if err != nil { w.lg.Info("Error getting latest header", "err", err) - reqPollAfter() + w.scheduleNextPoll(nil, queryDur, false) continue } if w.currHead != nil && w.currHead.Hash() == head.Hash() { w.lg.Trace("No change in head, skipping notifications") - reqPollAfter() + w.scheduleNextPoll(head, queryDur, false) continue } - w.lg.Trace("Notifying subscribers of new head", "head", head.Hash()) + headTime := time.Unix(int64(head.Time), 0) + w.lg.Trace( + "Notifying subscribers of new head", + "height", head.Number, + "headTime", headTime.Format("15:04:05"), + "head", head.Hash(), + ) w.currHead = head w.mtx.RLock() for _, sub := range w.subs { sub <- head } w.mtx.RUnlock() - reqPollAfter() + w.scheduleNextPoll(head, queryDur, true) case <-w.ctx.Done(): w.Client.Close() return @@ -199,8 +197,48 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) } +// scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time: +// - Goal: align the next request to head.Time + pollRate (close to when the next block appears) +func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration, changed bool) { + if w.pollRate == 0 { + return + } + + const ( + minDelay = 200 * time.Millisecond + minDelayNoChange = 1 * time.Second + ) + + // Retry on failure + if head == nil { + time.AfterFunc(minDelay, w.reqPoll) + return + } + + // Align next poll to headTime + pollRate with a lead time. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(-queryDur + minDelay) + delay := time.Until(target) + + if delay < minDelay { + if changed { + delay = minDelay + } else { + delay = minDelayNoChange + } + } + if delay > w.pollRate { + delay = w.pollRate + } + w.lg.Trace("Scheduled next poll", "delay", delay) + time.AfterFunc(delay, w.reqPoll) +} + func (w *PollingClient) reqPoll() { - w.pollReqCh <- struct{}{} + // non-blocking send + select { + case w.pollReqCh <- struct{}{}: + default: + } } func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eventSig string) ([]types.Log, error) { diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index bcae2d6e..e7aa2dd3 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -147,7 +147,7 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error { } func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { - client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, 1, n.lg) + client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, cfg.L1.L1BlockTime, n.lg) if err != nil { return fmt.Errorf("failed to create L1 source: %w", err) } @@ -166,7 +166,7 @@ func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { return fmt.Errorf("no L1 beacon or DA URL provided") } if cfg.RandaoSourceURL != "" { - rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, 1, n.lg) + rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, cfg.L1.L1BlockTime, n.lg) if err != nil { return fmt.Errorf("failed to create randao source: %w", err) } From 906888941af091feb83ba0c98d50b97be588ae71 Mon Sep 17 00:00:00 2001 From: syntrust Date: Mon, 19 Jan 2026 10:22:03 +0800 Subject: [PATCH 3/7] minor --- ethstorage/eth/polling_client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index f58c2b41..38150330 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -198,7 +198,6 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { } // scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time: -// - Goal: align the next request to head.Time + pollRate (close to when the next block appears) func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration, changed bool) { if w.pollRate == 0 { return @@ -229,7 +228,6 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Durat if delay > w.pollRate { delay = w.pollRate } - w.lg.Trace("Scheduled next poll", "delay", delay) time.AfterFunc(delay, w.reqPoll) } From fc84cbc1f190c5bef2877dfe0f0d345888a45721 Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 20 Jan 2026 11:22:18 +0800 Subject: [PATCH 4/7] fix comments --- ethstorage/eth/polling_client.go | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 38150330..e8f4684b 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -161,12 +161,12 @@ func (w *PollingClient) pollHeads() { if err != nil { w.lg.Info("Error getting latest header", "err", err) - w.scheduleNextPoll(nil, queryDur, false) + w.scheduleNextPoll(nil, queryDur) continue } if w.currHead != nil && w.currHead.Hash() == head.Hash() { w.lg.Trace("No change in head, skipping notifications") - w.scheduleNextPoll(head, queryDur, false) + w.scheduleNextPoll(head, queryDur) continue } @@ -183,7 +183,7 @@ func (w *PollingClient) pollHeads() { sub <- head } w.mtx.RUnlock() - w.scheduleNextPoll(head, queryDur, true) + w.scheduleNextPoll(head, queryDur) case <-w.ctx.Done(): w.Client.Close() return @@ -198,15 +198,12 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { } // scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time: -func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration, changed bool) { +func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration) { if w.pollRate == 0 { return } - const ( - minDelay = 200 * time.Millisecond - minDelayNoChange = 1 * time.Second - ) + const minDelay = 200 * time.Millisecond // Retry on failure if head == nil { @@ -214,20 +211,13 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Durat return } - // Align next poll to headTime + pollRate with a lead time. - target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(-queryDur + minDelay) - delay := time.Until(target) + // Align next poll to headTime + pollRate with queryDur+minDelay as network delay estimation. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(queryDur + minDelay) + // bound the delay between minDelay and pollRate + delay := min(max(time.Until(target), minDelay), w.pollRate) + + w.lg.Trace("Scheduled next poll", "queryDur", queryDur, "delay", delay) - if delay < minDelay { - if changed { - delay = minDelay - } else { - delay = minDelayNoChange - } - } - if delay > w.pollRate { - delay = w.pollRate - } time.AfterFunc(delay, w.reqPoll) } From c55618a6237589462bd2a76d4ba62e9c2fabeaa1 Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 20 Jan 2026 17:05:45 +0800 Subject: [PATCH 5/7] fix comment --- ethstorage/eth/polling_client.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index e8f4684b..65f7e15d 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -155,23 +155,21 @@ func (w *PollingClient) pollHeads() { for { select { case <-w.pollReqCh: - pollStart := time.Now() head, err := w.queryHeader() - queryDur := time.Since(pollStart) if err != nil { w.lg.Info("Error getting latest header", "err", err) - w.scheduleNextPoll(nil, queryDur) + w.scheduleNextPoll(nil) continue } if w.currHead != nil && w.currHead.Hash() == head.Hash() { w.lg.Trace("No change in head, skipping notifications") - w.scheduleNextPoll(head, queryDur) + w.scheduleNextPoll(head) continue } headTime := time.Unix(int64(head.Time), 0) - w.lg.Trace( + w.lg.Warn( "Notifying subscribers of new head", "height", head.Number, "headTime", headTime.Format("15:04:05"), @@ -183,7 +181,7 @@ func (w *PollingClient) pollHeads() { sub <- head } w.mtx.RUnlock() - w.scheduleNextPoll(head, queryDur) + w.scheduleNextPoll(head) case <-w.ctx.Done(): w.Client.Close() return @@ -197,12 +195,11 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) } -// scheduleNextPoll decides the next poll time based on queryHeader duration and head.Time: -func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Duration) { +// scheduleNextPoll decides the next poll time based on next head.Time: +func (w *PollingClient) scheduleNextPoll(head *types.Header) { if w.pollRate == 0 { return } - const minDelay = 200 * time.Millisecond // Retry on failure @@ -210,13 +207,14 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header, queryDur time.Durat time.AfterFunc(minDelay, w.reqPoll) return } - - // Align next poll to headTime + pollRate with queryDur+minDelay as network delay estimation. - target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(queryDur + minDelay) + // A heuristic estimation of p2p network delay to balance timely polling and request frequency + const heuristicDelay = 700 * time.Millisecond + // Align next poll to headTime + pollRate + heuristic p2p network delay. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(heuristicDelay) // bound the delay between minDelay and pollRate delay := min(max(time.Until(target), minDelay), w.pollRate) - w.lg.Trace("Scheduled next poll", "queryDur", queryDur, "delay", delay) + w.lg.Warn("Scheduled next poll", "delay", delay) time.AfterFunc(delay, w.reqPoll) } From eb31152d7e2c01ee7befbb7aa3cbd734328a993b Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 20 Jan 2026 17:20:53 +0800 Subject: [PATCH 6/7] debug --- ethstorage/eth/polling_client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 65f7e15d..a13152de 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -200,17 +200,16 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header) { if w.pollRate == 0 { return } - const minDelay = 200 * time.Millisecond + // A heuristic estimation of p2p network delay to balance timely polling and request frequency + const minDelay = 700 * time.Millisecond // Retry on failure if head == nil { time.AfterFunc(minDelay, w.reqPoll) return } - // A heuristic estimation of p2p network delay to balance timely polling and request frequency - const heuristicDelay = 700 * time.Millisecond - // Align next poll to headTime + pollRate + heuristic p2p network delay. - target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(heuristicDelay) + // Align next poll to headTime + pollRate + slack. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(minDelay) // bound the delay between minDelay and pollRate delay := min(max(time.Until(target), minDelay), w.pollRate) From 4e5add3e2aaaa53356358c2bc63d45644ca22f35 Mon Sep 17 00:00:00 2001 From: syntrust Date: Tue, 20 Jan 2026 18:06:21 +0800 Subject: [PATCH 7/7] debug --- ethstorage/eth/polling_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index a13152de..5099099b 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -169,7 +169,7 @@ func (w *PollingClient) pollHeads() { } headTime := time.Unix(int64(head.Time), 0) - w.lg.Warn( + w.lg.Trace( "Notifying subscribers of new head", "height", head.Number, "headTime", headTime.Format("15:04:05"), @@ -201,7 +201,7 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header) { return } // A heuristic estimation of p2p network delay to balance timely polling and request frequency - const minDelay = 700 * time.Millisecond + const minDelay = 1000 * time.Millisecond // Retry on failure if head == nil { @@ -213,7 +213,7 @@ func (w *PollingClient) scheduleNextPoll(head *types.Header) { // bound the delay between minDelay and pollRate delay := min(max(time.Until(target), minDelay), w.pollRate) - w.lg.Warn("Scheduled next poll", "delay", delay) + w.lg.Trace("Scheduled next poll", "delay", delay) time.AfterFunc(delay, w.reqPoll) }