Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/validators-monitor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Config struct {
EthereumRPCURL string `json:"ethereum_rpc_url"`
ValidatorOptInContract string `json:"contract_address"`
RelayURLs []string `json:"relay_urls"`
SlackWebhookURL string `json:"slack_webhook_url"`
WebhookURLs []string `json:"webhook_urls"`
DashboardApiUrl string `json:"dashboard_api_url"`
HealthPort int `json:"health_port"`
LaggardMode *big.Int `json:"laggard_mode"`
Expand Down
12 changes: 6 additions & 6 deletions tools/validators-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ var (
),
}

optionSlackWebhook = &cli.StringFlag{
Name: "slack-webhook",
Usage: "Slack webhook URL for notifications",
EnvVars: []string{"SLACK_WEBHOOK_URL"},
optionWebhookUrls = &cli.StringSliceFlag{
Name: "webhooks",
Usage: "webhook URLs for notifications",
EnvVars: []string{"WEBHOOK_URLS"},
}

optionDashboardApiUrl = &cli.StringFlag{
Expand Down Expand Up @@ -187,7 +187,7 @@ func main() {
optionEthereumRpcUrl,
optionValidatorOptInContract,
optionTrackMissed,
optionSlackWebhook,
optionWebhookUrls,
optionDashboardApiUrl,
optionRelayUrls,
optionHealthPort,
Expand Down Expand Up @@ -220,7 +220,7 @@ func main() {
EthereumRPCURL: c.String(optionEthereumRpcUrl.Name),
ValidatorOptInContract: c.String(optionValidatorOptInContract.Name),
FetchIntervalSec: 12, // Use epoch duration
SlackWebhookURL: c.String(optionSlackWebhook.Name),
WebhookURLs: c.StringSlice(optionWebhookUrls.Name),
DashboardApiUrl: c.String(optionDashboardApiUrl.Name),
RelayURLs: c.StringSlice(optionRelayUrls.Name),
HealthPort: c.Int(optionHealthPort.Name),
Expand Down
2 changes: 1 addition & 1 deletion tools/validators-monitor/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func New(
beacon: beaconClient,
relay: api.NewRelayClient(cfg.RelayURLs, log, httpClient),
dashboard: dashboardClient,
notifier: notification.NewSlackNotifier(cfg.SlackWebhookURL, log),
notifier: notification.NewNotifier(cfg.WebhookURLs, log),
optChecker: optInChecker,
dutiesCache: make(map[uint64]cachedDuties),
processedBlocks: make(map[uint64]time.Time),
Expand Down
215 changes: 215 additions & 0 deletions tools/validators-monitor/notification/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package notification

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"math/big"
"net/http"
"time"

"github.com/ethereum/go-ethereum/params"
"github.com/primev/mev-commit/tools/validators-monitor/api"
)

// Message represents a notification message structure
// generalized to support different platforms.
type Message struct {
Text string `json:"text,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
}

// Attachment represents a message attachment
type Attachment struct {
Color string `json:"color,omitempty"`
Title string `json:"title,omitempty"`
Text string `json:"text,omitempty"`
Fields []Field `json:"fields,omitempty"`
Footer string `json:"footer,omitempty"`
TS int64 `json:"ts,omitempty"`
MarkdownIn []string `json:"mrkdwn_in,omitempty"`
}

// Field represents a field in a message attachment
type Field struct {
Title string `json:"title"`
Value string `json:"value"`
Short bool `json:"short"`
}

// Notifier sends notifications to multiple webhook endpoints
type Notifier struct {
webhookURLs []string
client *http.Client
logger *slog.Logger
enabled bool
}

// NewNotifier creates a new notifier instance
func NewNotifier(webhookURLs []string, logger *slog.Logger) *Notifier {
enabled := len(webhookURLs) > 0

if !enabled {
logger.Warn("Notifications disabled - no webhook URLs provided")
} else {
logger.Info("Notifications enabled")
}

return &Notifier{
webhookURLs: webhookURLs,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: logger,
enabled: enabled,
}
}

// SendMessage sends a message to all configured webhook endpoints
func (n *Notifier) SendMessage(ctx context.Context, message Message) error {
if !n.enabled {
n.logger.Debug("Notification skipped (disabled)")
return nil
}

messageJSON, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}

var errs []error
for _, webhookURL := range n.webhookURLs {
req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, bytes.NewBuffer(messageJSON))
if err != nil {
n.logger.Error("Failed to create request", "webhook", webhookURL, "error", err)
errs = append(errs, fmt.Errorf("create request (%s): %w", webhookURL, err))
continue
}

req.Header.Set("Content-Type", "application/json")

resp, err := n.client.Do(req)
if err != nil {
n.logger.Error("Failed to send notification", "webhook", webhookURL, "error", err)
errs = append(errs, fmt.Errorf("send notification (%s): %w", webhookURL, err))
continue
}

if _, err := io.Copy(io.Discard, resp.Body); err != nil {
n.logger.Error("Failed to read response body", "webhook", webhookURL, "error", err)
errs = append(errs, fmt.Errorf("read response body (%s): %w", webhookURL, err))
}

if err := resp.Body.Close(); err != nil {
n.logger.Error("Failed to close response body", "webhook", webhookURL, "error", err)
errs = append(errs, fmt.Errorf("close response body (%s): %w", webhookURL, err))
}

if resp.StatusCode != http.StatusOK {
n.logger.Error("Notification API returned non-200 status code", "webhook", webhookURL, "status", resp.StatusCode)
errs = append(errs, fmt.Errorf("non-200 status (%s): %d", webhookURL, resp.StatusCode))
continue
}
n.logger.Debug("Notification sent successfully", "webhook", webhookURL)
}

if len(errs) > 0 {
return fmt.Errorf("one or more notifications failed: %w", errors.Join(errs...))
}
return nil
}

// NotifyRelayData sends a notification about relay data for a validator
func (n *Notifier) NotifyRelayData(
ctx context.Context,
pubkey string,
validatorIndex,
blockNumber,
slot uint64,
mevReward *big.Int,
feeRecipient string,
relaysWithData,
allRelays []string,
dashboardInfo *api.DashboardResponse,
) error {
color := "#36a64f"
if len(relaysWithData) == 0 {
color = "#ff9900"
}

relaysWithDataStr := "None"
if len(relaysWithData) > 0 {
relaysWithDataStr = formatRelayList(relaysWithData)
}

fields := []Field{
{"Validator Index", fmt.Sprintf("%d", validatorIndex), true},
{"Slot", fmt.Sprintf("%d", slot), true},
{"Block Number", fmt.Sprintf("%d", blockNumber), true},
{"Validator Pubkey", pubkey, false},
{"Relays With Data", fmt.Sprintf("```%s```", relaysWithDataStr), false},
{"Data Availability", fmt.Sprintf("%d of %d relays have data", len(relaysWithData), len(allRelays)), false},
}

title := "Relay Data Available for Validator"
if len(relaysWithData) == 0 {
title = "No Relay Data Found for Validator"
}

message := Message{
Attachments: []Attachment{
{
Color: color,
Title: title,
Text: "Report on relay data for opted-in validator",
Fields: fields,
Footer: "Validator Monitor",
TS: time.Now().Unix(),
MarkdownIn: []string{"text", "fields"},
},
},
}

if dashboardInfo != nil {
attach := &message.Attachments[0]

attach.Fields = append(attach.Fields, Field{"Block Winner", dashboardInfo.Winner, false})
attach.Fields = append(attach.Fields, Field{"Commitments", fmt.Sprintf("%d (Rewards: %d, Slashes: %d)",
dashboardInfo.TotalOpenedCommitments,
dashboardInfo.TotalRewards,
dashboardInfo.TotalSlashes), false})

if dashboardInfo.TotalAmount != "" {
amountWei, ok := new(big.Int).SetString(dashboardInfo.TotalAmount, 10)
if ok {
attach.Fields = append(attach.Fields, Field{"Total Bid Amount", formatWeiToEth(amountWei), true})
} else {
attach.Fields = append(attach.Fields, Field{"Total Bid Amount (wei)", dashboardInfo.TotalAmount, true})
}
}
attach.Fields = append(attach.Fields, Field{"MEV Reward", formatWeiToEth(mevReward), true})
attach.Fields = append(attach.Fields, Field{"MEV Reward Recipient", feeRecipient, true})
}

return n.SendMessage(ctx, message)
}

func formatRelayList(relays []string) string {
if len(relays) == 0 {
return "None"
}
var result string
for _, r := range relays {
result += "- " + r + "\n"
}
return result
}

func formatWeiToEth(wei *big.Int) string {
ethValue := new(big.Float).Quo(new(big.Float).SetInt(wei), new(big.Float).SetFloat64(params.Ether))
return fmt.Sprintf("%.6f ETH", ethValue)
}
118 changes: 118 additions & 0 deletions tools/validators-monitor/notification/notifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package notification

import (
"context"
"encoding/json"
"io"
"log/slog"
"math/big"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)

func TestFormatRelayList(t *testing.T) {
if got := formatRelayList([]string{}); got != "None" {
t.Errorf("formatRelayList(empty) = %q; want \"None\"", got)
}
relays := []string{"relay1", "relay2"}
want := "- relay1\n- relay2\n"
if got := formatRelayList(relays); got != want {
t.Errorf("formatRelayList(%v) = %q; want %q", relays, got, want)
}
}

func TestFormatWeiToEth(t *testing.T) {
cases := []struct {
wei *big.Int
want string
}{
{big.NewInt(0), "0.000000 ETH"},
{big.NewInt(params.Ether), "1.000000 ETH"},
{big.NewInt(1234567890000000000), "1.234568 ETH"},
}
for _, c := range cases {
if got := formatWeiToEth(c.wei); got != c.want {
t.Errorf("formatWeiToEth(%v) = %q; want %q", c.wei, got, c.want)
}
}
}

func TestNewNotifier(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
n := NewNotifier([]string{}, logger)
if n.enabled {
t.Errorf("expected notifier disabled when webhookURLs are empty")
}
n2 := NewNotifier([]string{"http://example.com"}, logger)
if !n2.enabled {
t.Errorf("expected notifier enabled when webhookURLs provided")
}
}

func TestSendMessage_Disabled(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
n := NewNotifier([]string{}, logger)
if err := n.SendMessage(context.Background(), Message{Text: "hello"}); err != nil {
t.Errorf("SendMessage disabled = error %v; want nil", err)
}
}

func TestSendMessage_Success(t *testing.T) {
var received []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
received = body
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

logger := slog.New(slog.NewTextHandler(io.Discard, nil))
n := NewNotifier([]string{server.URL}, logger)
msg := Message{Text: "test"}
require.NoError(t, n.SendMessage(context.Background(), msg))

var got Message
require.NoError(t, json.Unmarshal(received, &got))
require.Equal(t, msg.Text, got.Text)
}

func TestSendMessage_NonOK(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()

logger := slog.New(slog.NewTextHandler(io.Discard, nil))
n := NewNotifier([]string{server.URL}, logger)
err := n.SendMessage(context.Background(), Message{Text: "err"})
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "one or more notifications failed"))
}

func TestNotifyRelayData(t *testing.T) {
var payload Message
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := json.NewDecoder(r.Body).Decode(&payload)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

logger := slog.New(slog.NewTextHandler(io.Discard, nil))
n := NewNotifier([]string{server.URL}, logger)
relays := []string{"relay1"}
allRelays := []string{"relay1", "relay2"}

err := n.NotifyRelayData(context.Background(), "0xabc", 123, 456, 789, big.NewInt(2e18), "0xfee", relays, allRelays, nil)
require.NoError(t, err)

require.Len(t, payload.Attachments, 1)
attachment := payload.Attachments[0]
require.Equal(t, "#36a64f", attachment.Color)
require.Equal(t, "Relay Data Available for Validator", attachment.Title)
}
Loading
Loading