Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [2305](https://github.com/zeta-chain/node/pull/2305) - add new messages `MsgAddAuthorization` and `MsgRemoveAuthorization` that can be used to update the authorization list
* [2313](https://github.com/zeta-chain/node/pull/2313) - add `CheckAuthorization` function to replace the `IsAuthorized` function. The new function uses the authorization list to verify the signer's authorization.
* [2312](https://github.com/zeta-chain/node/pull/2312) - add queries `ShowAuthorization` and `ListAuthorizations`
* [2325](https://github.com/zeta-chain/node/pull/2325) - revert telemetry server changes

### Refactor

Expand Down
5 changes: 3 additions & 2 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ func (ob *Observer) GetLastBlockHeight() int64 {

func (ob *Observer) SetLastBlockHeightScanned(height int64) {
atomic.StoreInt64(&ob.lastBlockScanned, height)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height))
// #nosec G701 checked as positive
ob.ts.SetLastScannedBlockNumber(ob.chain, uint64(height))
}

func (ob *Observer) GetLastBlockHeightScanned() int64 {
Expand Down Expand Up @@ -594,7 +595,7 @@ func (ob *Observer) FetchUTXOs() error {
}

ob.Mu.Lock()
metrics.NumberOfUTXO.Set(float64(len(utxosFiltered)))
ob.ts.SetNumberOfUTXOs(len(utxosFiltered))
ob.utxos = utxosFiltered
ob.Mu.Unlock()
return nil
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/evm/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (ob *Observer) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes
// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) SetLastBlockHeightScanned(height uint64) {
atomic.StoreUint64(&ob.lastBlockScanned, height)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height))
ob.ts.SetLastScannedBlockNumber(ob.chain, height)
}

// GetLastBlockHeightScanned get last block height scanned (not necessarily caught up with external block; could be slow/paused)
Expand Down
128 changes: 119 additions & 9 deletions zetaclient/metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -11,23 +12,33 @@ import (
"github.com/gorilla/mux"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/constant"
"github.com/zeta-chain/zetacore/zetaclient/types"
)

// TelemetryServer provide http endpoint for Tss server
// TelemetryServer provides http endpoint for Tss server
type TelemetryServer struct {
logger zerolog.Logger
s *http.Server
p2pid string
mu sync.Mutex
ipAddress string
HotKeyBurnRate *BurnRate
logger zerolog.Logger
s *http.Server
p2pid string
lastScannedBlockNumber map[int64]uint64 // chainID => block number
lastCoreBlockNumber int64
mu sync.Mutex
lastStartTimestamp time.Time
status types.Status
ipAddress string
HotKeyBurnRate *BurnRate
}

// NewTelemetryServer should only listen to the loopback
func NewTelemetryServer() *TelemetryServer {
hs := &TelemetryServer{
logger: log.With().Str("module", "http").Logger(),
HotKeyBurnRate: NewBurnRate(100),
logger: log.With().Str("module", "http").Logger(),
lastScannedBlockNumber: make(map[int64]uint64),
lastStartTimestamp: time.Now(),
HotKeyBurnRate: NewBurnRate(100),
}
s := &http.Server{
Addr: ":8123",
Expand Down Expand Up @@ -67,6 +78,51 @@ func (t *TelemetryServer) GetIPAddress() string {
return t.ipAddress
}

// GetLastStartTimestamp returns last start timestamp
func (t *TelemetryServer) GetLastStartTimestamp() time.Time {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastStartTimestamp
}

// SetLastScannedBlockNumber last scanned block number for chain in telemetry and metrics
func (t *TelemetryServer) SetLastScannedBlockNumber(chain chains.Chain, blockNumber uint64) {
t.mu.Lock()
t.lastScannedBlockNumber[chain.ChainId] = blockNumber
LastScannedBlockNumber.WithLabelValues(chain.ChainName.String()).Set(float64(blockNumber))
t.mu.Unlock()
}

// GetLastScannedBlockNumber returns last scanned block number for chain
func (t *TelemetryServer) GetLastScannedBlockNumber(chainID int64) uint64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastScannedBlockNumber[chainID]
}

// SetCoreBlockNumber sets core block number in telemetry and metrics
func (t *TelemetryServer) SetCoreBlockNumber(blockNumber int64) {
t.mu.Lock()
t.lastCoreBlockNumber = blockNumber
LastCoreBlockNumber.Set(float64(blockNumber))
t.mu.Unlock()
}

// GetCoreBlockNumber returns core block number
func (t *TelemetryServer) GetCoreBlockNumber() int64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastCoreBlockNumber
}

// SetNumberOfUTXOs sets number of UTXOs in telemetry and metrics
func (t *TelemetryServer) SetNumberOfUTXOs(numberOfUTXOs int) {
t.mu.Lock()
t.status.BTCNumberOfUTXOs = numberOfUTXOs
NumberOfUTXO.Set(float64(numberOfUTXOs))
t.mu.Unlock()
}

// AddFeeEntry adds fee entry
func (t *TelemetryServer) AddFeeEntry(block int64, amount int64) {
t.mu.Lock()
Expand All @@ -82,6 +138,11 @@ func (t *TelemetryServer) Handlers() http.Handler {
router := mux.NewRouter()
router.Handle("/ping", http.HandlerFunc(t.pingHandler)).Methods(http.MethodGet)
router.Handle("/p2p", http.HandlerFunc(t.p2pHandler)).Methods(http.MethodGet)
router.Handle("/version", http.HandlerFunc(t.versionHandler)).Methods(http.MethodGet)
router.Handle("/lastscannedblock", http.HandlerFunc(t.lastScannedBlockHandler)).Methods(http.MethodGet)
router.Handle("/laststarttimestamp", http.HandlerFunc(t.lastStartTimestampHandler)).Methods(http.MethodGet)
router.Handle("/lastcoreblock", http.HandlerFunc(t.lastCoreBlockHandler)).Methods(http.MethodGet)
router.Handle("/status", http.HandlerFunc(t.statusHandler)).Methods(http.MethodGet)
router.Handle("/ip", http.HandlerFunc(t.ipHandler)).Methods(http.MethodGet)
router.Handle("/hotkeyburnrate", http.HandlerFunc(t.hotKeyFeeBurnRate)).Methods(http.MethodGet)

Expand All @@ -90,6 +151,7 @@ func (t *TelemetryServer) Handlers() http.Handler {
return router
}

// Start starts telemetry server
func (t *TelemetryServer) Start() error {
if t.s == nil {
return errors.New("invalid http server instance")
Expand All @@ -103,6 +165,7 @@ func (t *TelemetryServer) Start() error {
return nil
}

// Stop stops telemetry server
func (t *TelemetryServer) Stop() error {
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -131,6 +194,53 @@ func (t *TelemetryServer) ipHandler(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "%s", t.ipAddress)
}

func (t *TelemetryServer) lastScannedBlockHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")

t.mu.Lock()
defer t.mu.Unlock()
// Convert map to JSON
jsonBytes, err := json.Marshal(t.lastScannedBlockNumber)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(jsonBytes)
if err != nil {
t.logger.Error().Err(err).Msg("Failed to write response")
}
}

func (t *TelemetryServer) lastCoreBlockHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
fmt.Fprintf(w, "%d", t.lastCoreBlockNumber)
}

func (t *TelemetryServer) statusHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
s, err := json.MarshalIndent(t.status, "", "\t")
if err != nil {
t.logger.Error().Err(err).Msg("Failed to marshal status")
}
fmt.Fprintf(w, "%s", s)
}

func (t *TelemetryServer) versionHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "%s", constant.Version)
}

func (t *TelemetryServer) lastStartTimestampHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
defer t.mu.Unlock()
fmt.Fprintf(w, "%s", t.lastStartTimestamp.Format(time.RFC3339))
}

func (t *TelemetryServer) hotKeyFeeBurnRate(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
t.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (oc *Orchestrator) StartCctxScheduler(appContext *context.AppContext) {

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
oc.ts.SetCoreBlockNumber(lastBlockNum)
}
}
}
Expand Down