From e7ebb7a5ea4108ebcafb3bbcd43e4cd351389d6f Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Tue, 4 Apr 2023 21:49:52 +0200 Subject: [PATCH 1/4] Add handling multiple beacon clients --- builder/beacon_client.go | 258 ++++++++++++++++++++++++++++----------- builder/config.go | 4 +- builder/service.go | 8 +- cmd/geth/main.go | 2 +- cmd/utils/flags.go | 10 +- 5 files changed, 203 insertions(+), 79 deletions(-) diff --git a/builder/beacon_client.go b/builder/beacon_client.go index aaa584fdd4..8bb82a9530 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -1,6 +1,7 @@ package builder import ( + "context" "encoding/json" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/r3labs/sse" + "golang.org/x/exp/slices" ) type IBeaconClient interface { @@ -64,6 +66,123 @@ func (b *NilBeaconClient) Start() error { return nil } func (b *NilBeaconClient) Stop() {} +type MultiBeaconClient struct { + clients []*BeaconClient + closeCh chan struct{} +} + +func NewMultiBeaconClient(endpoints []string, slotsInEpoch uint64, secondsInSlot uint64) *MultiBeaconClient { + clients := []*BeaconClient{} + for _, endpoint := range endpoints { + client := NewBeaconClient(endpoint, slotsInEpoch, secondsInSlot) + clients = append(clients, client) + } + + return &MultiBeaconClient{ + clients: clients, + closeCh: make(chan struct{}), + } +} + +func (m *MultiBeaconClient) isValidator(pubkey PubkeyHex) bool { + for _, c := range m.clients { + // Pick the first one, always true + return c.isValidator(pubkey) + } + + return false +} + +func (m *MultiBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { + var allErrs error + for _, c := range m.clients { + pk, err := c.getProposerForNextSlot(requestedSlot) + if err != nil { + allErrs = errors.Join(allErrs, err) + continue + } + + return pk, nil + } + return PubkeyHex(""), allErrs +} + +func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPayloadAttributes) bool { + if l.Timestamp != r.Timestamp || + l.Random != r.Random || + l.SuggestedFeeRecipient != r.SuggestedFeeRecipient || + l.Slot != r.Slot || + l.HeadHash != r.HeadHash || + l.GasLimit != r.GasLimit { + return false + } + + if !slices.Equal(l.Withdrawals, r.Withdrawals) { + return false + } + + return true +} + +func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { + clientsChan := make(chan types.BuilderPayloadAttributes, len(m.clients)) + for _, c := range m.clients { + go c.SubscribeToPayloadAttributesEvents(clientsChan) + } + + currentSlot := uint64(0) + currentSlotPayloadAttributes := []types.BuilderPayloadAttributes{} + + for { + select { + case <-m.closeCh: + return + case payloadAttrs := <-clientsChan: + if payloadAttrs.Slot < currentSlot { + continue + } else if payloadAttrs.Slot == currentSlot { + shouldSkip := false + for _, previousPayloadAttrs := range currentSlotPayloadAttributes { + if payloadAttributesMatch(previousPayloadAttrs, payloadAttrs) { + shouldSkip = true + break + } + } + if shouldSkip { + continue + } + + payloadAttrC <- payloadAttrs + currentSlotPayloadAttributes = append(currentSlotPayloadAttributes, payloadAttrs) + } else if payloadAttrs.Slot > currentSlot { + currentSlot = payloadAttrs.Slot + payloadAttrC <- payloadAttrs + currentSlotPayloadAttributes = []types.BuilderPayloadAttributes{payloadAttrs} + } + + } + } +} + +func (m *MultiBeaconClient) Start() error { + var allErrs error + for _, c := range m.clients { + err := c.Start() + if err != nil { + allErrs = errors.Join(allErrs, err) + } + } + return allErrs +} + +func (m *MultiBeaconClient) Stop() { + for _, c := range m.clients { + c.Stop() + } + + close(m.closeCh) +} + type BeaconClient struct { endpoint string slotsInEpoch uint64 @@ -72,21 +191,24 @@ type BeaconClient struct { mu sync.Mutex slotProposerMap map[uint64]PubkeyHex - closeCh chan struct{} + ctx context.Context + cancelFn context.CancelFunc } func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient { + ctx, cancelFn := context.WithCancel(context.Background()) return &BeaconClient{ endpoint: endpoint, slotsInEpoch: slotsInEpoch, secondsInSlot: secondsInSlot, slotProposerMap: make(map[uint64]PubkeyHex), - closeCh: make(chan struct{}), + ctx: ctx, + cancelFn: cancelFn, } } func (b *BeaconClient) Stop() { - close(b.closeCh) + b.cancelFn() } func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool { @@ -139,7 +261,7 @@ func (b *BeaconClient) UpdateValidatorMapForever() { defer timer.Stop() for true { select { - case <-b.closeCh: + case <-b.ctx.Done(): return case <-timer.C: } @@ -184,6 +306,70 @@ func (b *BeaconClient) UpdateValidatorMapForever() { } } +// PayloadAttributesEvent represents the data of a payload_attributes event +// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}} +type PayloadAttributesEvent struct { + Version string `json:"version"` + Data PayloadAttributesEventData `json:"data"` +} + +type PayloadAttributesEventData struct { + ProposalSlot uint64 `json:"proposal_slot,string"` + ParentBlockHash common.Hash `json:"parent_block_hash"` + PayloadAttributes PayloadAttributes `json:"payload_attributes"` +} + +type PayloadAttributes struct { + Timestamp uint64 `json:"timestamp,string"` + PrevRandao common.Hash `json:"prev_randao"` + SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"` + Withdrawals []*capella.Withdrawal `json:"withdrawals"` +} + +// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals +func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { + payloadAttributesResp := new(PayloadAttributesEvent) + + eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint) + log.Info("subscribing to payload_attributes events") + + for { + client := sse.NewClient(eventsURL) + err := client.SubscribeRawWithContext(b.ctx, func(msg *sse.Event) { + err := json.Unmarshal(msg.Data, payloadAttributesResp) + if err != nil { + log.Error("could not unmarshal payload_attributes event", "err", err) + } else { + // convert capella.Withdrawal to types.Withdrawal + withdrawals := make([]*types.Withdrawal, len(payloadAttributesResp.Data.PayloadAttributes.Withdrawals)) + for i, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { + withdrawals[i] = &types.Withdrawal{ + Index: uint64(w.Index), + Validator: uint64(w.ValidatorIndex), + Address: common.Address(w.Address), + Amount: uint64(w.Amount), + } + } + + data := types.BuilderPayloadAttributes{ + Slot: payloadAttributesResp.Data.ProposalSlot, + HeadHash: payloadAttributesResp.Data.ParentBlockHash, + Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp), + Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao, + SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient, + Withdrawals: withdrawals, + } + payloadAttrC <- data + } + }) + if err != nil { + log.Error("failed to subscribe to payload_attributes events", "err", err) + time.Sleep(1 * time.Second) + } + log.Warn("beaconclient SubscribeRaw ended, reconnecting") + } +} + func fetchCurrentSlot(endpoint string) (uint64, error) { headerRes := &struct { Data []struct { @@ -286,67 +472,3 @@ func fetchBeacon(url string, dst any) error { log.Info("fetched", "url", url, "res", dst) return nil } - -// PayloadAttributesEvent represents the data of a payload_attributes event -// {"version": "capella", "data": {"proposer_index": "123", "proposal_slot": "10", "parent_block_number": "9", "parent_block_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "parent_block_hash": "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "payload_attributes": {"timestamp": "123456", "prev_randao": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", "suggested_fee_recipient": "0x0000000000000000000000000000000000000000", "withdrawals": [{"index": "5", "validator_index": "10", "address": "0x0000000000000000000000000000000000000000", "amount": "15640"}]}}} -type PayloadAttributesEvent struct { - Version string `json:"version"` - Data PayloadAttributesEventData `json:"data"` -} - -type PayloadAttributesEventData struct { - ProposalSlot uint64 `json:"proposal_slot,string"` - ParentBlockHash common.Hash `json:"parent_block_hash"` - PayloadAttributes PayloadAttributes `json:"payload_attributes"` -} - -type PayloadAttributes struct { - Timestamp uint64 `json:"timestamp,string"` - PrevRandao common.Hash `json:"prev_randao"` - SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"` - Withdrawals []*capella.Withdrawal `json:"withdrawals"` -} - -// SubscribeToPayloadAttributesEvents subscribes to payload attributes events to validate fields such as prevrandao and withdrawals -func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { - payloadAttributesResp := new(PayloadAttributesEvent) - - eventsURL := fmt.Sprintf("%s/eth/v1/events?topics=payload_attributes", b.endpoint) - log.Info("subscribing to payload_attributes events") - - for { - client := sse.NewClient(eventsURL) - err := client.SubscribeRaw(func(msg *sse.Event) { - err := json.Unmarshal(msg.Data, payloadAttributesResp) - if err != nil { - log.Error("could not unmarshal payload_attributes event", "err", err) - } else { - // convert capella.Withdrawal to types.Withdrawal - withdrawals := make([]*types.Withdrawal, len(payloadAttributesResp.Data.PayloadAttributes.Withdrawals)) - for i, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { - withdrawals[i] = &types.Withdrawal{ - Index: uint64(w.Index), - Validator: uint64(w.ValidatorIndex), - Address: common.Address(w.Address), - Amount: uint64(w.Amount), - } - } - - data := types.BuilderPayloadAttributes{ - Slot: payloadAttributesResp.Data.ProposalSlot, - HeadHash: payloadAttributesResp.Data.ParentBlockHash, - Timestamp: hexutil.Uint64(payloadAttributesResp.Data.PayloadAttributes.Timestamp), - Random: payloadAttributesResp.Data.PayloadAttributes.PrevRandao, - SuggestedFeeRecipient: payloadAttributesResp.Data.PayloadAttributes.SuggestedFeeRecipient, - Withdrawals: withdrawals, - } - payloadAttrC <- data - } - }) - if err != nil { - log.Error("failed to subscribe to payload_attributes events", "err", err) - time.Sleep(1 * time.Second) - } - log.Warn("beaconclient SubscribeRaw ended, reconnecting") - } -} diff --git a/builder/config.go b/builder/config.go index cc445d5efc..f9f2bba5b3 100644 --- a/builder/config.go +++ b/builder/config.go @@ -14,7 +14,7 @@ type Config struct { GenesisForkVersion string `toml:",omitempty"` BellatrixForkVersion string `toml:",omitempty"` GenesisValidatorsRoot string `toml:",omitempty"` - BeaconEndpoint string `toml:",omitempty"` + BeaconEndpoints []string `toml:",omitempty"` RemoteRelayEndpoint string `toml:",omitempty"` SecondaryRemoteRelayEndpoints []string `toml:",omitempty"` ValidationBlocklist string `toml:",omitempty"` @@ -35,7 +35,7 @@ var DefaultConfig = Config{ GenesisForkVersion: "0x00000000", BellatrixForkVersion: "0x02000000", GenesisValidatorsRoot: "0x0000000000000000000000000000000000000000000000000000000000000000", - BeaconEndpoint: "http://127.0.0.1:5052", + BeaconEndpoints: []string{"http://127.0.0.1:5052"}, RemoteRelayEndpoint: "", SecondaryRemoteRelayEndpoints: nil, ValidationBlocklist: "", diff --git a/builder/service.go b/builder/service.go index 32d3799dbf..a315553212 100644 --- a/builder/service.go +++ b/builder/service.go @@ -121,10 +121,12 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot) var beaconClient IBeaconClient - if cfg.BeaconEndpoint != "" { - beaconClient = NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot) - } else { + if len(cfg.BeaconEndpoints) == 0 { beaconClient = &NilBeaconClient{} + } else if len(cfg.BeaconEndpoints) == 1 { + beaconClient = NewBeaconClient(cfg.BeaconEndpoints[0], cfg.SlotsInEpoch, cfg.SecondsInSlot) + } else { + beaconClient = NewMultiBeaconClient(cfg.BeaconEndpoints, cfg.SlotsInEpoch, cfg.SecondsInSlot) } var localRelay *LocalRelay diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5c592ee09d..24a9395cf0 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -171,7 +171,7 @@ var ( utils.BuilderGenesisForkVersion, utils.BuilderBellatrixForkVersion, utils.BuilderGenesisValidatorsRoot, - utils.BuilderBeaconEndpoint, + utils.BuilderBeaconEndpoints, utils.BuilderRemoteRelayEndpoint, utils.BuilderSecondaryRemoteRelayEndpoints, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2c318ecb20..973580de76 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -778,10 +778,10 @@ var ( Value: "0x0000000000000000000000000000000000000000000000000000000000000000", Category: flags.BuilderCategory, } - BuilderBeaconEndpoint = &cli.StringFlag{ - Name: "builder.beacon_endpoint", - Usage: "Beacon endpoint to connect to for beacon chain data", - EnvVars: []string{"BUILDER_BEACON_ENDPOINT"}, + BuilderBeaconEndpoints = &cli.StringFlag{ + Name: "builder.beacon_endpoints", + Usage: "Comma separated list of beacon endpoints to connect to for beacon chain data", + EnvVars: []string{"BUILDER_BEACON_ENDPOINTS"}, Value: "http://127.0.0.1:5052", Category: flags.BuilderCategory, } @@ -1608,7 +1608,7 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) { cfg.GenesisForkVersion = ctx.String(BuilderGenesisForkVersion.Name) cfg.BellatrixForkVersion = ctx.String(BuilderBellatrixForkVersion.Name) cfg.GenesisValidatorsRoot = ctx.String(BuilderGenesisValidatorsRoot.Name) - cfg.BeaconEndpoint = ctx.String(BuilderBeaconEndpoint.Name) + cfg.BeaconEndpoints = strings.Split(ctx.String(BuilderBeaconEndpoints.Name), ",") cfg.RemoteRelayEndpoint = ctx.String(BuilderRemoteRelayEndpoint.Name) cfg.SecondaryRemoteRelayEndpoints = strings.Split(ctx.String(BuilderSecondaryRemoteRelayEndpoints.Name), ",") cfg.ValidationBlocklist = ctx.String(BuilderBlockValidationBlacklistSourceFilePath.Name) From f00b7353b1440f2ce4a928ee3868b7e825e25e74 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Thu, 6 Apr 2023 19:43:36 +0200 Subject: [PATCH 2/4] Initialize stop channel in builder.Builder --- builder/builder.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/builder/builder.go b/builder/builder.go index b7c86c40ff..42a15beeea 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -94,6 +94,8 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe slot: 0, slotCtx: slotCtx, slotCtxCancel: slotCtxCancel, + + stop: make(chan struct{}, 1), } } @@ -102,7 +104,7 @@ func (b *Builder) Start() error { go func() { c := make(chan types.BuilderPayloadAttributes) go b.beaconClient.SubscribeToPayloadAttributesEvents(c) - beacon_loop: + beacon_loop: for { select { case <-b.stop: From adf5e040b7d22006ff44cecefafedf96cdd87ca4 Mon Sep 17 00:00:00 2001 From: avalonche Date: Fri, 7 Apr 2023 14:11:24 +1000 Subject: [PATCH 3/4] fix withdrawals array pointer --- builder/beacon_client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/builder/beacon_client.go b/builder/beacon_client.go index 8bb82a9530..458d81b19f 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -341,14 +341,14 @@ func (b *BeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan type log.Error("could not unmarshal payload_attributes event", "err", err) } else { // convert capella.Withdrawal to types.Withdrawal - withdrawals := make([]*types.Withdrawal, len(payloadAttributesResp.Data.PayloadAttributes.Withdrawals)) - for i, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { - withdrawals[i] = &types.Withdrawal{ + var withdrawals []*types.Withdrawal + for _, w := range payloadAttributesResp.Data.PayloadAttributes.Withdrawals { + withdrawals = append(withdrawals, &types.Withdrawal{ Index: uint64(w.Index), Validator: uint64(w.ValidatorIndex), Address: common.Address(w.Address), Amount: uint64(w.Amount), - } + }) } data := types.BuilderPayloadAttributes{ From 840b3a8a9f0610bfb0b2675f6b315f73853d7ba1 Mon Sep 17 00:00:00 2001 From: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> Date: Mon, 10 Apr 2023 11:22:44 +0200 Subject: [PATCH 4/4] Build on a single head (#59) * Build on a single head * Forcibly stop building process for old sse events --- builder/beacon_client.go | 36 +----------------------------------- builder/builder.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 25 insertions(+), 47 deletions(-) diff --git a/builder/beacon_client.go b/builder/beacon_client.go index 458d81b19f..f3122e2758 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -125,42 +125,8 @@ func payloadAttributesMatch(l types.BuilderPayloadAttributes, r types.BuilderPay } func (m *MultiBeaconClient) SubscribeToPayloadAttributesEvents(payloadAttrC chan types.BuilderPayloadAttributes) { - clientsChan := make(chan types.BuilderPayloadAttributes, len(m.clients)) for _, c := range m.clients { - go c.SubscribeToPayloadAttributesEvents(clientsChan) - } - - currentSlot := uint64(0) - currentSlotPayloadAttributes := []types.BuilderPayloadAttributes{} - - for { - select { - case <-m.closeCh: - return - case payloadAttrs := <-clientsChan: - if payloadAttrs.Slot < currentSlot { - continue - } else if payloadAttrs.Slot == currentSlot { - shouldSkip := false - for _, previousPayloadAttrs := range currentSlotPayloadAttributes { - if payloadAttributesMatch(previousPayloadAttrs, payloadAttrs) { - shouldSkip = true - break - } - } - if shouldSkip { - continue - } - - payloadAttrC <- payloadAttrs - currentSlotPayloadAttributes = append(currentSlotPayloadAttributes, payloadAttrs) - } else if payloadAttrs.Slot > currentSlot { - currentSlot = payloadAttrs.Slot - payloadAttrC <- payloadAttrs - currentSlotPayloadAttributes = []types.BuilderPayloadAttributes{payloadAttrs} - } - - } + go c.SubscribeToPayloadAttributesEvents(payloadAttrC) } } diff --git a/builder/builder.go b/builder/builder.go index 42a15beeea..52d87141c5 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -104,16 +104,27 @@ func (b *Builder) Start() error { go func() { c := make(chan types.BuilderPayloadAttributes) go b.beaconClient.SubscribeToPayloadAttributesEvents(c) - beacon_loop: + + currentSlot := uint64(0) + for { select { case <-b.stop: - break beacon_loop + return case payloadAttributes := <-c: - b.OnPayloadAttribute(&payloadAttributes) + // Right now we are building only on a single head. This might change in the future! + if payloadAttributes.Slot < currentSlot { + continue + } else if payloadAttributes.Slot == currentSlot { + b.OnPayloadAttribute(&payloadAttributes) + } else if payloadAttributes.Slot > currentSlot { + currentSlot = payloadAttributes.Slot + b.OnPayloadAttribute(&payloadAttributes) + } } } }() + return b.relay.Start() } @@ -291,18 +302,19 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro b.slotMu.Lock() defer b.slotMu.Unlock() - if b.slot != attrs.Slot { - if b.slotCtxCancel != nil { - b.slotCtxCancel() - } + // Forcibly cancel previous building job, build on top of reorgable blocks as this is the behaviour relays expect. + // This will change in the future - slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) - b.slot = attrs.Slot - b.slotAttrs = nil - b.slotCtx = slotCtx - b.slotCtxCancel = slotCtxCancel + if b.slotCtxCancel != nil { + b.slotCtxCancel() } + slotCtx, slotCtxCancel := context.WithTimeout(context.Background(), 12*time.Second) + b.slot = attrs.Slot + b.slotAttrs = nil + b.slotCtx = slotCtx + b.slotCtxCancel = slotCtxCancel + for _, currentAttrs := range b.slotAttrs { if attrs.Equal(¤tAttrs) { log.Debug("ignoring known payload attribute", "slot", attrs.Slot, "hash", attrs.HeadHash)