Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/yetanotherco/aligned_layer/metrics"

"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
Expand Down Expand Up @@ -86,8 +85,12 @@ type Aggregator struct {

logger logging.Logger

// Metrics
metricsReg *prometheus.Registry
metrics *metrics.Metrics

// Telemetry
telemetry *Telemetry
}

func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error) {
Expand Down Expand Up @@ -125,7 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
aggregatorPrivateKey := aggregatorConfig.EcdsaConfig.PrivateKey

logger := aggregatorConfig.BaseConfig.Logger
clients, err := clients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
clients, err := sdkclients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
if err != nil {
logger.Errorf("Cannot create sdk clients", "err", err)
return nil, err
Expand Down Expand Up @@ -154,6 +157,9 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
reg := prometheus.NewRegistry()
aggregatorMetrics := metrics.NewMetrics(aggregatorConfig.Aggregator.MetricsIpPortAddress, reg, logger)

// Telemetry
aggregatorTelemetry := NewTelemetry(aggregatorConfig.Aggregator.TelemetryIpPortAddress, logger)

nextBatchIndex := uint32(0)

aggregator := Aggregator{
Expand All @@ -176,6 +182,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
logger: logger,
metricsReg: reg,
metrics: aggregatorMetrics,
telemetry: aggregatorTelemetry,
}

return &aggregator, nil
Expand Down Expand Up @@ -216,9 +223,20 @@ func (agg *Aggregator) Start(ctx context.Context) error {
const MaxSentTxRetries = 5

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

// Finish task trace once the task is processed (either successfully or not)
defer agg.telemetry.FinishTrace(batchData.BatchMerkleRoot)

if blsAggServiceResp.Err != nil {
agg.telemetry.LogTaskError(batchData.BatchMerkleRoot, blsAggServiceResp.Err)
Comment thread
JuArce marked this conversation as resolved.
agg.taskMutex.Lock()
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err, "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
agg.logger.Info("- Locking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)

Expand Down Expand Up @@ -249,18 +267,18 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
NonSignerStakeIndices: blsAggServiceResp.NonSignerStakeIndices,
}

agg.logger.Info("- Locking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)
agg.taskMutex.Lock()
Comment thread
JuArce marked this conversation as resolved.
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching merkle root")
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]

// Delete the task from the map
delete(agg.operatorRespondedBatch, blsAggServiceResp.TaskIndex)

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
agg.logger.Info("- Unlocking task mutex: Delete task from operator map", "taskIndex", blsAggServiceResp.TaskIndex)

agg.taskMutex.Unlock()

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand All @@ -282,7 +300,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

Expand Down Expand Up @@ -312,6 +330,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
agg.telemetry.LogTaskError(batchMerkleRoot, err)
return nil, err
}

Expand All @@ -330,6 +349,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
}

func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
agg.telemetry.InitNewTrace(batchMerkleRoot)
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

Expand Down
1 change: 1 addition & 0 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)

taskIndex := uint32(0)
ok := false
Expand Down
134 changes: 134 additions & 0 deletions aggregator/internal/pkg/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package pkg

import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
)

type TraceMessage struct {
MerkleRoot string `json:"merkle_root"`
}

type OperatorResponseMessage struct {
MerkleRoot string `json:"merkle_root"`
OperatorId string `json:"operator_id"`
}
type QuorumReachedMessage struct {
MerkleRoot string `json:"merkle_root"`
}

type TaskErrorMessage struct {
MerkleRoot string `json:"merkle_root"`
TaskError string `json:"error"`
}

type Telemetry struct {
client *http.Client
baseURL *url.URL
logger logging.Logger
}

func NewTelemetry(serverAddress string, logger logging.Logger) *Telemetry {
client := &http.Client{}

baseURL := &url.URL{
Scheme: "http",
Host: serverAddress,
}
logger.Info("[Telemetry] Starting Telemetry client.", "server_address",
serverAddress)

return &Telemetry{
client: client,
baseURL: baseURL,
logger: logger,
}
}

func (t *Telemetry) InitNewTrace(batchMerkleRoot [32]byte) {
body := TraceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/initTaskTrace", body); err != nil {
t.logger.Error("[Telemetry] Error in InitNewTrace", "error", err)
}
}

func (t *Telemetry) LogOperatorResponse(batchMerkleRoot [32]byte, operatorId [32]byte) {
body := OperatorResponseMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
OperatorId: fmt.Sprintf("0x%s", hex.EncodeToString(operatorId[:])),
}
if err := t.sendTelemetryMessage("/api/operatorResponse", body); err != nil {
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

func (t *Telemetry) LogQuorumReached(batchMerkleRoot [32]byte) {
body := QuorumReachedMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/quorumReached", body); err != nil {
t.logger.Error("[Telemetry] Error in LogQuorumReached", "error", err)
}
}

func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
body := TaskErrorMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TaskError: taskError.Error(),
}
if err := t.sendTelemetryMessage("/api/taskError", body); err != nil {
t.logger.Error("[Telemetry] Error in LogTaskError", "error", err)
}
}

func (t *Telemetry) FinishTrace(batchMerkleRoot [32]byte) {
// In order to wait for all operator responses, even if the quorum is reached, this function has a delayed execution
go func() {
time.Sleep(10 * time.Second)
Comment thread
JuArce marked this conversation as resolved.
body := TraceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
}
if err := t.sendTelemetryMessage("/api/finishTaskTrace", body); err != nil {
t.logger.Error("[Telemetry] Error in FinishTrace", "error", err)
}
}()
}

func (t *Telemetry) sendTelemetryMessage(endpoint string, message interface{}) error {
encodedBody, err := json.Marshal(message)
if err != nil {
t.logger.Error("[Telemetry] Error marshalling JSON", "error", err)
return fmt.Errorf("error marshalling JSON: %w", err)
}

t.logger.Info("[Telemetry] Sending message.", "endpoint", endpoint, "message", message)

fullURL := t.baseURL.ResolveReference(&url.URL{Path: endpoint})

resp, err := t.client.Post(fullURL.String(), "application/json", bytes.NewBuffer(encodedBody))
if err != nil {
t.logger.Error("[Telemetry] Error sending POST request", "error", err)
return fmt.Errorf("error making POST request: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.logger.Error("[Telemetry] Error reading response body", "error", err)
return fmt.Errorf("error reading response body: %w", err)
}

t.logger.Info("[Telemetry] Response received", "status", resp.Status, "response_body", string(respBody))

return nil
}
1 change: 1 addition & 0 deletions config-files/config-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ aggregator:
avs_service_manager_address: 0xc3e53F4d16Ae77Db1c982e75a937B9f60FE63690
enable_metrics: true
metrics_ip_port_address: localhost:9091
telemetry_ip_port_address: localhost:4000
## Operator Configurations
# operator:
# aggregator_rpc_server_ip_port_address: localhost:8090
Expand Down
8 changes: 6 additions & 2 deletions core/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package config

import (
"errors"
sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/common"
"log"
"os"

sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/common"
)

type AggregatorConfig struct {
Expand All @@ -18,6 +19,7 @@ type AggregatorConfig struct {
AvsServiceManagerAddress common.Address
EnableMetrics bool
MetricsIpPortAddress string
TelemetryIpPortAddress string
}
}

Expand All @@ -28,6 +30,7 @@ type AggregatorConfigFromYaml struct {
AvsServiceManagerAddress common.Address `yaml:"avs_service_manager_address"`
EnableMetrics bool `yaml:"enable_metrics"`
MetricsIpPortAddress string `yaml:"metrics_ip_port_address"`
TelemetryIpPortAddress string `yaml:"telemetry_ip_port_address"`
} `yaml:"aggregator"`
}

Expand Down Expand Up @@ -68,6 +71,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
AvsServiceManagerAddress common.Address
EnableMetrics bool
MetricsIpPortAddress string
TelemetryIpPortAddress string
}(aggregatorConfigFromYaml.Aggregator),
}
}
Loading