Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,10 @@ if impl(ghc >= 9.12)
proto-lens-tests
proto-lens

source-repository-package
type: git
location: https://github.com/input-output-hk/ekg-forward.git
tag: c72c9a29045431df7484b665bed33c12ea71d0ac
--sha256: sha256-b87qt8RMI4gNPF8QTrRjfS5KK2/JhbxUp5ijscn2Vf8=
subdir:
.
4 changes: 4 additions & 0 deletions cardano-tracer/cardano-tracer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ library
Cardano.Tracer.Handlers.Metrics.Monitoring
Cardano.Tracer.Handlers.Metrics.Prometheus
Cardano.Tracer.Handlers.Metrics.Servers
Cardano.Tracer.Handlers.Metrics.TimeseriesServer
Cardano.Tracer.Handlers.Metrics.Utils

Cardano.Tracer.Handlers.Notifications.Check
Expand All @@ -147,6 +148,8 @@ library

other-modules: Cardano.Tracer.Handlers.Logs.Journal.NoSystemd
Cardano.Tracer.Handlers.Notifications.Timer
Cardano.Tracer.Time

Paths_cardano_tracer

autogen-modules: Paths_cardano_tracer
Expand Down Expand Up @@ -202,6 +205,7 @@ library
, warp ^>= 3.4
, warp-tls
, yaml
, cardano-timeseries-io

if os(windows)
build-depends: Win32
Expand Down
9 changes: 6 additions & 3 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

module Cardano.Tracer.Acceptors.Client
( runAcceptorsClient
Expand Down Expand Up @@ -41,7 +43,7 @@ import Data.Word (Word32)
import qualified Network.Mux as Mux
import qualified Network.Socket as Socket
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Acceptor (acceptEKGMetricsInit)
import System.Metrics.Network.Acceptor (acceptMetricsInit)

import qualified Trace.Forward.Configuration.DataPoint as DPF
import qualified Trace.Forward.Configuration.TraceObject as TF
Expand Down Expand Up @@ -192,10 +194,11 @@ runEKGAcceptorInit
respoinderCtx
LBS.ByteString IO () Void
runEKGAcceptorInit tracerEnv ekgConfig errorHandler =
acceptEKGMetricsInit
acceptMetricsInit
ekgConfig
(prepareMetricsStores tracerEnv . micConnectionId)
(errorHandler . micConnectionId)
(store tracerEnv . connIdToNodeId . micConnectionId)
(errorHandler . micConnectionId) where

runTraceObjectsAcceptorInit
:: Show addr
Expand Down
15 changes: 7 additions & 8 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ module Cardano.Tracer.Acceptors.Server
( runAcceptorsServer
) where

import "contra-tracer" Control.Tracer (nullTracer)

import Cardano.Logging (TraceObject)
import qualified Cardano.Logging.Types as Net
import Cardano.Tracer.Acceptors.Utils
Expand All @@ -25,25 +23,25 @@ import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (.
miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..))
import qualified Ouroboros.Network.Protocol.Handshake as Handshake
import qualified Ouroboros.Network.Server.Simple as Server
import Ouroboros.Network.Snocket (LocalAddress, LocalSocket, Snocket,
localAddressFromPath, localSnocket, makeLocalBearer, makeSocketBearer,
socketSnocket)
import Ouroboros.Network.Socket (ConnectionId (..),
SomeResponderApplication (..))
import qualified Ouroboros.Network.Server.Simple as Server
import Ouroboros.Network.Socket (ConnectionId (..), SomeResponderApplication (..))

import Codec.CBOR.Term (Term)
import Control.Concurrent.Async (wait)
import "contra-tracer" Control.Tracer (nullTracer)
import qualified Data.ByteString.Lazy as LBS
import Data.Functor (void)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Text as Text
import Data.Functor (void)
import Data.Void (Void)
import Data.Word (Word32)
import qualified Network.Mux as Mux
import qualified Network.Socket as Socket
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Acceptor (acceptEKGMetricsResp)
import System.Metrics.Network.Acceptor (acceptMetricsResp)

import qualified Trace.Forward.Configuration.DataPoint as DPF
import qualified Trace.Forward.Configuration.TraceObject as TF
Expand Down Expand Up @@ -180,9 +178,10 @@ runEKGAcceptor
-> (ConnectionId addr -> IO ())
-> RunMiniProtocol 'Mux.ResponderMode initiatorCtx (ResponderContext addr) LBS.ByteString IO Void ()
runEKGAcceptor tracerEnv ekgConfig errorHandler =
acceptEKGMetricsResp
acceptMetricsResp
ekgConfig
(prepareMetricsStores tracerEnv . rcConnectionId)
(store tracerEnv . connIdToNodeId . rcConnectionId)
(errorHandler . rcConnectionId)

runTraceObjectsAcceptor
Expand Down
49 changes: 34 additions & 15 deletions cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,45 @@
{-# LANGUAGE TupleSections #-}

{-# OPTIONS_GHC -Wno-redundant-constraints #-}
{-# LANGUAGE ViewPatterns #-}

module Cardano.Tracer.Acceptors.Utils
( prepareDataPointRequestor
, prepareMetricsStores
, removeDisconnectedNode
, notifyAboutNodeDisconnected
, store
) where

#if RTVIEW
import Cardano.Logging (SeverityS (..))
#endif
import qualified Cardano.Timeseries.Component as Timeseries
import Cardano.Timeseries.Domain.Types (MetricIdentifier)
import Cardano.Tracer.Environment
#if RTVIEW
import Cardano.Tracer.Handlers.Notifications.Types
import Cardano.Tracer.Handlers.Notifications.Utils
#endif
import Cardano.Tracer.Environment
import Cardano.Tracer.Time (getTimeMs)
import Cardano.Tracer.Types
import Cardano.Tracer.Utils
import Ouroboros.Network.Socket (ConnectionId (..))

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO)
import qualified Data.Bimap as BM
import Data.Foldable
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe)
import qualified Data.Set as S
import Data.Time.Clock.POSIX (getPOSIXTime)
#if RTVIEW
import Data.Time.Clock.System (getSystemTime, systemToUTCTime)
#endif
import qualified System.Metrics as EKG
import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore)
import System.Metrics.ReqResp
import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore,
storeMetrics)

import Trace.Forward.Utils.DataPoint (DataPointRequestor, initDataPointRequestor)

Expand All @@ -54,26 +64,17 @@ prepareMetricsStores
-> IO (EKG.Store, TVar MetricsLocalStore)
prepareMetricsStores TracerEnv{teConnectedNodes, teAcceptedMetrics} connId = do
addConnectedNode teConnectedNodes connId
store <- EKG.newStore
st <- EKG.newStore

EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs store
storesForNewNode <- (store ,) <$> newTVarIO emptyMetricsLocalStore
EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs st
storesForNewNode <- (st ,) <$> newTVarIO emptyMetricsLocalStore

atomically do
modifyTVar' teAcceptedMetrics do
M.insert (connIdToNodeId connId) storesForNewNode

return storesForNewNode

where
-- forkServer definition of `getTimeMs'. The ekg frontend relies
-- on the "ekg.server_timestamp_ms" metric being in every
-- store. While forkServer adds that that automatically we must
-- manually add it.
-- url
-- + https://github.com/tvh/ekg-wai/blob/master/System/Remote/Monitoring/Wai.hs#L237-L238
getTimeMs = (round . (* 1000)) `fmap` getPOSIXTime

addConnectedNode
:: Show addr
=> ConnectedNodes
Expand Down Expand Up @@ -115,3 +116,21 @@ notifyAboutNodeDisconnected TracerEnvRTView{teEventsQueues} connId = do
#else
notifyAboutNodeDisconnected _ _ = pure ()
#endif

store :: TracerEnv -> NodeId -> (EKG.Store, TVar MetricsLocalStore) -> Response -> IO ()
store tracerEnv (NodeId nodeId) (ekgStore, localStore) resp@(ResponseMetrics ms) = do
storeMetrics resp ekgStore localStore
ts <- getTimeMs
for_ (teTimeseriesHandle tracerEnv) $ \h ->
Timeseries.insert h "node_id" nodeId (fromIntegral ts) (mapMaybe parseMetric ms)

where
numeralOnly :: MetricValue -> Maybe Double
numeralOnly (GaugeValue x) = Just (fromIntegral x)
numeralOnly (CounterValue x) = Just (fromIntegral x)
numeralOnly (LabelValue _) = Nothing

parseMetric :: (MetricName, MetricValue) -> Maybe (MetricIdentifier, Double)
parseMetric (k, numeralOnly -> Just v) = Just (k, v)
parseMetric _ = Nothing

1 change: 1 addition & 0 deletions cardano-tracer/src/Cardano/Tracer/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ data TracerConfig = TracerConfig
, hasEKG :: !(Maybe Endpoint) -- ^ Endpoint for EKG web-page.
, hasPrometheus :: !(Maybe Endpoint) -- ^ Endpoint for Prometheus web-page.
, hasRTView :: !(Maybe Endpoint) -- ^ Endpoint for RTView web-page.
, hasTimeseries :: !(Maybe Endpoint)
, tlsCertificate :: !(Maybe Certificate)
-- | Socket for tracer's to reforward on. Second member of the triplet is the list of prefixes to reforward.
-- Third member of the triplet is the forwarder config.
Expand Down
2 changes: 2 additions & 0 deletions cardano-tracer/src/Cardano/Tracer/Environment.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Cardano.Tracer.Environment
) where

import Cardano.Logging.Types
import Cardano.Timeseries.Component (TimeseriesHandle)
import Cardano.Tracer.Configuration
#if RTVIEW
import Cardano.Tracer.Handlers.Notifications.Types
Expand Down Expand Up @@ -36,6 +37,7 @@ data TracerEnv = TracerEnv
, teRegistry :: !HandleRegistry
, teStateDir :: !(Maybe FilePath)
, teMetricsHelp :: ![(Text, Builder)]
, teTimeseriesHandle :: !(Maybe TimeseriesHandle)
}

#if RTVIEW
Expand Down
13 changes: 9 additions & 4 deletions cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}

module Cardano.Tracer.Handlers.Metrics.Servers
( runMetricsServers
Expand All @@ -10,17 +10,19 @@ import Cardano.Tracer.Configuration
import Cardano.Tracer.Environment
import Cardano.Tracer.Handlers.Metrics.Monitoring
import Cardano.Tracer.Handlers.Metrics.Prometheus
import Cardano.Tracer.Handlers.Metrics.TimeseriesServer (runTimeseriesServer)
import qualified Cardano.Tracer.Handlers.Metrics.Utils as Utils
import Cardano.Tracer.Utils (sequenceConcurrently_)

import Control.AutoUpdate
import Data.Maybe (catMaybes)
import Control.Monad (unless)
import Data.Maybe (catMaybes)

-- | Runs metrics servers if needed:
--
-- 1. Prometheus exporter.
-- 2. EKG monitoring web-page.
-- 3. Timeseries query server.
--
runMetricsServers
:: TracerEnv
Expand All @@ -44,8 +46,11 @@ runMetricsServers tracerEnv = do
servers = catMaybes
[ runPrometheusServer tracerEnv <$> hasPrometheus
, runMonitoringServer tracerEnv <$> hasEKG
, const <$> (runTimeseriesServer teTracer cfg <$> hasTimeseries <*> teTimeseriesHandle)
]

TracerEnv
{ teConfig = TracerConfig { hasPrometheus, hasEKG }
{ teConfig = cfg@TracerConfig { hasPrometheus, hasEKG, hasTimeseries },
teTracer,
teTimeseriesHandle
} = tracerEnv
Loading