diff --git a/cmd/main.go b/cmd/main.go index 1c66164..fbf13c0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -47,9 +47,9 @@ func main() { defer cancel() var wg sync.WaitGroup - // Start the attestation checker service in a goroutine - logger.Info("Starting attestation checker for %d validators", len(indices)) - checker := &services.AttestationChecker{ + // Start the duties checker service in a goroutine + logger.Info("Starting duties checker for %d validators", len(indices)) + dutiesChecker := &services.DutiesChecker{ BeaconAdapter: adapter, Web3SignerAdapter: web3Signer, PollInterval: 1 * time.Minute, @@ -58,7 +58,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - checker.Run(ctx) + dutiesChecker.Run(ctx) }() // Handle graceful shutdown diff --git a/internal/adapters/attestantclient_adapter.go b/internal/adapters/attestantclient_adapter.go index 45e5ebe..4061840 100644 --- a/internal/adapters/attestantclient_adapter.go +++ b/internal/adapters/attestantclient_adapter.go @@ -185,3 +185,44 @@ func (b *beaconAttestantClient) GetValidatorIndicesByPubkeys(ctx context.Context } return indices, nil } + +// GetProposerDuties retrieves proposer duties for the given epoch and validator indices. +func (b *beaconAttestantClient) GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error) { + var beaconIndices []phase0.ValidatorIndex + for _, idx := range indices { + beaconIndices = append(beaconIndices, phase0.ValidatorIndex(idx)) + } + + resp, err := b.client.ProposerDuties(ctx, &api.ProposerDutiesOpts{ + Epoch: phase0.Epoch(epoch), + Indices: beaconIndices, + }) + if err != nil { + return nil, err + } + + var duties []domain.ProposerDuty + for _, d := range resp.Data { + duties = append(duties, domain.ProposerDuty{ + Slot: domain.Slot(d.Slot), + ValidatorIndex: domain.ValidatorIndex(d.ValidatorIndex), + }) + } + return duties, nil +} + +// DidProposeBlock checks a given slot includes a block proposed +func (b *beaconAttestantClient) DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error) { + block, err := b.client.SignedBeaconBlock(ctx, &api.SignedBeaconBlockOpts{ + Block: fmt.Sprintf("%d", slot), + }) + if err != nil { + // TODO: are we sure we can assume that a 404 means the block was not proposed? + // What error code is returned in all consensus if the block is not in their state? + if apiErr, ok := err.(*api.Error); ok && apiErr.StatusCode == 404 { + return false, nil // Block was not proposed + } + return false, err // Real error + } + return block != nil && block.Data != nil, nil +} diff --git a/internal/application/domain/validator.go b/internal/application/domain/validator.go index 29fea9a..f2369ad 100644 --- a/internal/application/domain/validator.go +++ b/internal/application/domain/validator.go @@ -1,10 +1,15 @@ package domain +// -------------------------------------------------------- + +// Domain types used for anything related to validators type Epoch uint64 type Slot uint64 type ValidatorIndex uint64 -type CommitteeIndex uint64 +// -------------------------------------------------------- + +// Attestation-related types type ValidatorDuty struct { Slot Slot CommitteeIndex CommitteeIndex @@ -12,10 +17,19 @@ type ValidatorDuty struct { ValidatorIndex ValidatorIndex } -type CommitteeSizeMap map[CommitteeIndex]int - type Attestation struct { DataSlot Slot CommitteeBits []byte AggregationBits []byte } + +type CommitteeSizeMap map[CommitteeIndex]int +type CommitteeIndex uint64 + +// -------------------------------------------------------- + +// Proposer-related types +type ProposerDuty struct { + Slot Slot + ValidatorIndex ValidatorIndex +} diff --git a/internal/application/ports/beaconchain.go b/internal/application/ports/beaconchain.go index 2201f37..b897f6f 100644 --- a/internal/application/ports/beaconchain.go +++ b/internal/application/ports/beaconchain.go @@ -13,4 +13,7 @@ type BeaconChainAdapter interface { GetCommitteeSizeMap(ctx context.Context, slot domain.Slot) (domain.CommitteeSizeMap, error) GetBlockAttestations(ctx context.Context, slot domain.Slot) ([]domain.Attestation, error) GetValidatorIndicesByPubkeys(ctx context.Context, pubkeys []string) ([]domain.ValidatorIndex, error) + + GetProposerDuties(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) ([]domain.ProposerDuty, error) + DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error) } diff --git a/internal/application/services/attestation_oracle.go b/internal/application/services/dutieschecker_service.go similarity index 77% rename from internal/application/services/attestation_oracle.go rename to internal/application/services/dutieschecker_service.go index 75af6c4..c276567 100644 --- a/internal/application/services/attestation_oracle.go +++ b/internal/application/services/dutieschecker_service.go @@ -9,7 +9,7 @@ import ( "github.com/dappnode/validator-tracker/internal/logger" ) -type AttestationChecker struct { +type DutiesChecker struct { BeaconAdapter ports.BeaconChainAdapter Web3SignerAdapter ports.Web3SignerAdapter PollInterval time.Duration @@ -19,7 +19,7 @@ type AttestationChecker struct { } // If at interval, ticker ticks but check has not ended, we wont start a new check, we will just wait for the next tick. -func (a *AttestationChecker) Run(ctx context.Context) { +func (a *DutiesChecker) Run(ctx context.Context) { ticker := time.NewTicker(a.PollInterval) defer ticker.Stop() @@ -33,7 +33,7 @@ func (a *AttestationChecker) Run(ctx context.Context) { } } -func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { +func (a *DutiesChecker) checkLatestFinalizedEpoch(ctx context.Context) { finalizedEpoch, err := a.BeaconAdapter.GetFinalizedEpoch(ctx) if err != nil { logger.Error("Error fetching finalized epoch: %v", err) @@ -43,7 +43,6 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { logger.Debug("Finalized epoch %d unchanged, skipping check.", finalizedEpoch) return } - a.lastFinalizedEpoch = finalizedEpoch logger.Info("New finalized epoch %d detected.", finalizedEpoch) @@ -62,9 +61,50 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { validatorIndices := a.getValidatorsToCheck(indices, finalizedEpoch) if len(validatorIndices) == 0 { + logger.Debug("No validators left to check for epoch %d", finalizedEpoch) + return + } + + // Split proposal vs attestation logic + a.checkProposals(ctx, finalizedEpoch, validatorIndices) + a.checkAttestations(ctx, finalizedEpoch, validatorIndices) +} + +func (a *DutiesChecker) checkProposals( + ctx context.Context, + finalizedEpoch domain.Epoch, + indices []domain.ValidatorIndex, +) { + proposerDuties, err := a.BeaconAdapter.GetProposerDuties(ctx, finalizedEpoch, indices) + if err != nil { + logger.Error("Error fetching proposer duties: %v", err) return } + if len(proposerDuties) == 0 { + logger.Warn("No proposer duties found for finalized epoch %d.", finalizedEpoch) + return + } + + for _, duty := range proposerDuties { + didPropose, err := a.BeaconAdapter.DidProposeBlock(ctx, duty.Slot) + if err != nil { + logger.Warn("⚠️ Could not determine if block was proposed at slot %d: %v", duty.Slot, err) + continue + } + if didPropose { + logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot) + } else { + logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot) + } + } +} + +func (a *DutiesChecker) checkAttestations( + ctx context.Context, + finalizedEpoch domain.Epoch, + validatorIndices []domain.ValidatorIndex, +) { duties, err := a.BeaconAdapter.GetValidatorDutiesBatch(ctx, finalizedEpoch, validatorIndices) if err != nil { logger.Error("Error fetching validator duties: %v", err) @@ -75,11 +115,10 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { return } - minSlot, maxSlot := getSlotRangeForDuties(duties) // Maximum of 32 + 31 = 63 slots max. Will only happen with users with lots of validators. - slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) // get all attestations for the needed slots + minSlot, maxSlot := getSlotRangeForDuties(duties) + slotAttestations := preloadSlotAttestations(ctx, a.BeaconAdapter, minSlot, maxSlot) committeeSizeCache := make(map[domain.Slot]domain.CommitteeSizeMap) - // Fore each duty (one x validator), check if there is an attestation for the duty slot in the next 32 slots. for _, duty := range duties { attestationFound := a.checkDutyAttestation(ctx, duty, slotAttestations, committeeSizeCache) if !attestationFound { @@ -90,7 +129,7 @@ func (a *AttestationChecker) checkLatestFinalizedEpoch(ctx context.Context) { } } -func (a *AttestationChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, epoch domain.Epoch) []domain.ValidatorIndex { +func (a *DutiesChecker) getValidatorsToCheck(indices []domain.ValidatorIndex, epoch domain.Epoch) []domain.ValidatorIndex { var result []domain.ValidatorIndex for _, index := range indices { if a.wasCheckedThisEpoch(index, epoch) { @@ -101,11 +140,11 @@ func (a *AttestationChecker) getValidatorsToCheck(indices []domain.ValidatorInde return result } -func (a *AttestationChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool { +func (a *DutiesChecker) wasCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) bool { return a.CheckedEpochs[index] == epoch } -func (a *AttestationChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) { +func (a *DutiesChecker) markCheckedThisEpoch(index domain.ValidatorIndex, epoch domain.Epoch) { if a.CheckedEpochs == nil { a.CheckedEpochs = make(map[domain.ValidatorIndex]domain.Epoch) } @@ -141,7 +180,7 @@ func preloadSlotAttestations(ctx context.Context, beacon ports.BeaconChainAdapte // checkDutyAttestation checks if there is an attestation for the given duty in the next 32 slots. // It uses the committee size cache to avoid fetching committee sizes for every duty in repeated slots. -func (a *AttestationChecker) checkDutyAttestation( +func (a *DutiesChecker) checkDutyAttestation( ctx context.Context, duty domain.ValidatorDuty, slotAttestations map[domain.Slot][]domain.Attestation,