From 7384a0179a38d1dac637a2aa6a8ebbc18fd559cd Mon Sep 17 00:00:00 2001 From: Russoul Date: Tue, 24 Mar 2026 23:06:07 +0400 Subject: [PATCH] bench: Integrate cardano-timeseries-io into cardano-tracer --- cabal.project | 7 + cardano-tracer/cardano-tracer.cabal | 4 + .../src/Cardano/Tracer/Acceptors/Client.hs | 9 +- .../src/Cardano/Tracer/Acceptors/Server.hs | 15 +- .../src/Cardano/Tracer/Acceptors/Utils.hs | 49 ++++-- .../src/Cardano/Tracer/Configuration.hs | 1 + .../src/Cardano/Tracer/Environment.hs | 2 + .../Tracer/Handlers/Metrics/Servers.hs | 13 +- .../Handlers/Metrics/TimeseriesServer.hs | 150 ++++++++++++++++++ .../src/Cardano/Tracer/MetaTrace.hs | 81 +++++++--- cardano-tracer/src/Cardano/Tracer/Run.hs | 34 ++-- cardano-tracer/src/Cardano/Tracer/Time.hs | 12 ++ .../test/Cardano/Tracer/Test/Acceptor.hs | 6 +- .../Cardano/Tracer/Test/DataPoint/Tests.hs | 4 +- .../test/Cardano/Tracer/Test/Logs/Tests.hs | 15 +- .../test/Cardano/Tracer/Test/Restart/Tests.hs | 3 +- .../cardano-tracer-service-workbench.nix | 5 + nix/workbench/service/tracer.nix | 4 + 18 files changed, 345 insertions(+), 69 deletions(-) create mode 100644 cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/TimeseriesServer.hs create mode 100644 cardano-tracer/src/Cardano/Tracer/Time.hs diff --git a/cabal.project b/cabal.project index fe404e60aa3..5d373a4a81e 100644 --- a/cabal.project +++ b/cabal.project @@ -101,3 +101,10 @@ if impl(ghc >= 9.12) proto-lens-tests proto-lens +source-repository-package + type: git + location: https://github.com/input-output-hk/ekg-forward.git + tag: c72c9a29045431df7484b665bed33c12ea71d0ac + --sha256: sha256-b87qt8RMI4gNPF8QTrRjfS5KK2/JhbxUp5ijscn2Vf8= + subdir: + . diff --git a/cardano-tracer/cardano-tracer.cabal b/cardano-tracer/cardano-tracer.cabal index 33f5cd51571..c4d637bd731 100644 --- a/cardano-tracer/cardano-tracer.cabal +++ b/cardano-tracer/cardano-tracer.cabal @@ -123,6 +123,7 @@ library Cardano.Tracer.Handlers.Metrics.Monitoring Cardano.Tracer.Handlers.Metrics.Prometheus Cardano.Tracer.Handlers.Metrics.Servers + Cardano.Tracer.Handlers.Metrics.TimeseriesServer Cardano.Tracer.Handlers.Metrics.Utils Cardano.Tracer.Handlers.Notifications.Check @@ -147,6 +148,8 @@ library other-modules: Cardano.Tracer.Handlers.Logs.Journal.NoSystemd Cardano.Tracer.Handlers.Notifications.Timer + Cardano.Tracer.Time + Paths_cardano_tracer autogen-modules: Paths_cardano_tracer @@ -202,6 +205,7 @@ library , warp ^>= 3.4 , warp-tls , yaml + , cardano-timeseries-io if os(windows) build-depends: Win32 diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs index 3211e877808..d40ad44cad6 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Client.hs @@ -1,4 +1,6 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} module Cardano.Tracer.Acceptors.Client ( runAcceptorsClient @@ -41,7 +43,7 @@ import Data.Word (Word32) import qualified Network.Mux as Mux import qualified Network.Socket as Socket import qualified System.Metrics.Configuration as EKGF -import System.Metrics.Network.Acceptor (acceptEKGMetricsInit) +import System.Metrics.Network.Acceptor (acceptMetricsInit) import qualified Trace.Forward.Configuration.DataPoint as DPF import qualified Trace.Forward.Configuration.TraceObject as TF @@ -192,10 +194,11 @@ runEKGAcceptorInit respoinderCtx LBS.ByteString IO () Void runEKGAcceptorInit tracerEnv ekgConfig errorHandler = - acceptEKGMetricsInit + acceptMetricsInit ekgConfig (prepareMetricsStores tracerEnv . micConnectionId) - (errorHandler . micConnectionId) + (store tracerEnv . connIdToNodeId . micConnectionId) + (errorHandler . micConnectionId) where runTraceObjectsAcceptorInit :: Show addr diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs index 37c0470c7e2..a05d900fd5b 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Server.hs @@ -5,8 +5,6 @@ module Cardano.Tracer.Acceptors.Server ( runAcceptorsServer ) where -import "contra-tracer" Control.Tracer (nullTracer) - import Cardano.Logging (TraceObject) import qualified Cardano.Logging.Types as Net import Cardano.Tracer.Acceptors.Utils @@ -25,25 +23,25 @@ import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (. miniProtocolNum, miniProtocolRun) import Ouroboros.Network.Protocol.Handshake (Handshake, HandshakeArguments (..)) import qualified Ouroboros.Network.Protocol.Handshake as Handshake +import qualified Ouroboros.Network.Server.Simple as Server import Ouroboros.Network.Snocket (LocalAddress, LocalSocket, Snocket, localAddressFromPath, localSnocket, makeLocalBearer, makeSocketBearer, socketSnocket) -import Ouroboros.Network.Socket (ConnectionId (..), - SomeResponderApplication (..)) -import qualified Ouroboros.Network.Server.Simple as Server +import Ouroboros.Network.Socket (ConnectionId (..), SomeResponderApplication (..)) import Codec.CBOR.Term (Term) import Control.Concurrent.Async (wait) +import "contra-tracer" Control.Tracer (nullTracer) import qualified Data.ByteString.Lazy as LBS +import Data.Functor (void) import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Text as Text -import Data.Functor (void) import Data.Void (Void) import Data.Word (Word32) import qualified Network.Mux as Mux import qualified Network.Socket as Socket import qualified System.Metrics.Configuration as EKGF -import System.Metrics.Network.Acceptor (acceptEKGMetricsResp) +import System.Metrics.Network.Acceptor (acceptMetricsResp) import qualified Trace.Forward.Configuration.DataPoint as DPF import qualified Trace.Forward.Configuration.TraceObject as TF @@ -180,9 +178,10 @@ runEKGAcceptor -> (ConnectionId addr -> IO ()) -> RunMiniProtocol 'Mux.ResponderMode initiatorCtx (ResponderContext addr) LBS.ByteString IO Void () runEKGAcceptor tracerEnv ekgConfig errorHandler = - acceptEKGMetricsResp + acceptMetricsResp ekgConfig (prepareMetricsStores tracerEnv . rcConnectionId) + (store tracerEnv . connIdToNodeId . rcConnectionId) (errorHandler . rcConnectionId) runTraceObjectsAcceptor diff --git a/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs b/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs index 79989e4a9d1..c5796928ba1 100644 --- a/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs +++ b/cardano-tracer/src/Cardano/Tracer/Acceptors/Utils.hs @@ -3,20 +3,27 @@ {-# LANGUAGE TupleSections #-} {-# OPTIONS_GHC -Wno-redundant-constraints #-} +{-# LANGUAGE ViewPatterns #-} module Cardano.Tracer.Acceptors.Utils ( prepareDataPointRequestor , prepareMetricsStores , removeDisconnectedNode , notifyAboutNodeDisconnected + , store ) where #if RTVIEW import Cardano.Logging (SeverityS (..)) +#endif +import qualified Cardano.Timeseries.Component as Timeseries +import Cardano.Timeseries.Domain.Types (MetricIdentifier) +import Cardano.Tracer.Environment +#if RTVIEW import Cardano.Tracer.Handlers.Notifications.Types import Cardano.Tracer.Handlers.Notifications.Utils #endif -import Cardano.Tracer.Environment +import Cardano.Tracer.Time (getTimeMs) import Cardano.Tracer.Types import Cardano.Tracer.Utils import Ouroboros.Network.Socket (ConnectionId (..)) @@ -24,14 +31,17 @@ import Ouroboros.Network.Socket (ConnectionId (..)) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar (TVar, modifyTVar', newTVarIO) import qualified Data.Bimap as BM +import Data.Foldable import qualified Data.Map.Strict as M +import Data.Maybe (mapMaybe) import qualified Data.Set as S -import Data.Time.Clock.POSIX (getPOSIXTime) #if RTVIEW import Data.Time.Clock.System (getSystemTime, systemToUTCTime) #endif import qualified System.Metrics as EKG -import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore) +import System.Metrics.ReqResp +import System.Metrics.Store.Acceptor (MetricsLocalStore, emptyMetricsLocalStore, + storeMetrics) import Trace.Forward.Utils.DataPoint (DataPointRequestor, initDataPointRequestor) @@ -54,10 +64,10 @@ prepareMetricsStores -> IO (EKG.Store, TVar MetricsLocalStore) prepareMetricsStores TracerEnv{teConnectedNodes, teAcceptedMetrics} connId = do addConnectedNode teConnectedNodes connId - store <- EKG.newStore + st <- EKG.newStore - EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs store - storesForNewNode <- (store ,) <$> newTVarIO emptyMetricsLocalStore + EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs st + storesForNewNode <- (st ,) <$> newTVarIO emptyMetricsLocalStore atomically do modifyTVar' teAcceptedMetrics do @@ -65,15 +75,6 @@ prepareMetricsStores TracerEnv{teConnectedNodes, teAcceptedMetrics} connId = do return storesForNewNode - where - -- forkServer definition of `getTimeMs'. The ekg frontend relies - -- on the "ekg.server_timestamp_ms" metric being in every - -- store. While forkServer adds that that automatically we must - -- manually add it. - -- url - -- + https://github.com/tvh/ekg-wai/blob/master/System/Remote/Monitoring/Wai.hs#L237-L238 - getTimeMs = (round . (* 1000)) `fmap` getPOSIXTime - addConnectedNode :: Show addr => ConnectedNodes @@ -115,3 +116,21 @@ notifyAboutNodeDisconnected TracerEnvRTView{teEventsQueues} connId = do #else notifyAboutNodeDisconnected _ _ = pure () #endif + +store :: TracerEnv -> NodeId -> (EKG.Store, TVar MetricsLocalStore) -> Response -> IO () +store tracerEnv (NodeId nodeId) (ekgStore, localStore) resp@(ResponseMetrics ms) = do + storeMetrics resp ekgStore localStore + ts <- getTimeMs + for_ (teTimeseriesHandle tracerEnv) $ \h -> + Timeseries.insert h "node_id" nodeId (fromIntegral ts) (mapMaybe parseMetric ms) + + where + numeralOnly :: MetricValue -> Maybe Double + numeralOnly (GaugeValue x) = Just (fromIntegral x) + numeralOnly (CounterValue x) = Just (fromIntegral x) + numeralOnly (LabelValue _) = Nothing + + parseMetric :: (MetricName, MetricValue) -> Maybe (MetricIdentifier, Double) + parseMetric (k, numeralOnly -> Just v) = Just (k, v) + parseMetric _ = Nothing + diff --git a/cardano-tracer/src/Cardano/Tracer/Configuration.hs b/cardano-tracer/src/Cardano/Tracer/Configuration.hs index 75d6f09932e..daee2dbc080 100644 --- a/cardano-tracer/src/Cardano/Tracer/Configuration.hs +++ b/cardano-tracer/src/Cardano/Tracer/Configuration.hs @@ -164,6 +164,7 @@ data TracerConfig = TracerConfig , hasEKG :: !(Maybe Endpoint) -- ^ Endpoint for EKG web-page. , hasPrometheus :: !(Maybe Endpoint) -- ^ Endpoint for Prometheus web-page. , hasRTView :: !(Maybe Endpoint) -- ^ Endpoint for RTView web-page. + , hasTimeseries :: !(Maybe Endpoint) , tlsCertificate :: !(Maybe Certificate) -- | Socket for tracer's to reforward on. Second member of the triplet is the list of prefixes to reforward. -- Third member of the triplet is the forwarder config. diff --git a/cardano-tracer/src/Cardano/Tracer/Environment.hs b/cardano-tracer/src/Cardano/Tracer/Environment.hs index 3daf1d0f4d3..d5cbcce3d8e 100644 --- a/cardano-tracer/src/Cardano/Tracer/Environment.hs +++ b/cardano-tracer/src/Cardano/Tracer/Environment.hs @@ -6,6 +6,7 @@ module Cardano.Tracer.Environment ) where import Cardano.Logging.Types +import Cardano.Timeseries.Component (TimeseriesHandle) import Cardano.Tracer.Configuration #if RTVIEW import Cardano.Tracer.Handlers.Notifications.Types @@ -36,6 +37,7 @@ data TracerEnv = TracerEnv , teRegistry :: !HandleRegistry , teStateDir :: !(Maybe FilePath) , teMetricsHelp :: ![(Text, Builder)] + , teTimeseriesHandle :: !(Maybe TimeseriesHandle) } #if RTVIEW diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs index 8425350a635..31398659024 100644 --- a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/Servers.hs @@ -1,6 +1,6 @@ -{-# LANGUAGE NumericUnderscores #-} -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE CPP #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} module Cardano.Tracer.Handlers.Metrics.Servers ( runMetricsServers @@ -10,17 +10,19 @@ import Cardano.Tracer.Configuration import Cardano.Tracer.Environment import Cardano.Tracer.Handlers.Metrics.Monitoring import Cardano.Tracer.Handlers.Metrics.Prometheus +import Cardano.Tracer.Handlers.Metrics.TimeseriesServer (runTimeseriesServer) import qualified Cardano.Tracer.Handlers.Metrics.Utils as Utils import Cardano.Tracer.Utils (sequenceConcurrently_) import Control.AutoUpdate -import Data.Maybe (catMaybes) import Control.Monad (unless) +import Data.Maybe (catMaybes) -- | Runs metrics servers if needed: -- -- 1. Prometheus exporter. -- 2. EKG monitoring web-page. +-- 3. Timeseries query server. -- runMetricsServers :: TracerEnv @@ -44,8 +46,11 @@ runMetricsServers tracerEnv = do servers = catMaybes [ runPrometheusServer tracerEnv <$> hasPrometheus , runMonitoringServer tracerEnv <$> hasEKG + , const <$> (runTimeseriesServer teTracer cfg <$> hasTimeseries <*> teTimeseriesHandle) ] TracerEnv - { teConfig = TracerConfig { hasPrometheus, hasEKG } + { teConfig = cfg@TracerConfig { hasPrometheus, hasEKG, hasTimeseries }, + teTracer, + teTimeseriesHandle } = tracerEnv diff --git a/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/TimeseriesServer.hs b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/TimeseriesServer.hs new file mode 100644 index 00000000000..3f995caef4f --- /dev/null +++ b/cardano-tracer/src/Cardano/Tracer/Handlers/Metrics/TimeseriesServer.hs @@ -0,0 +1,150 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ViewPatterns #-} + +module Cardano.Tracer.Handlers.Metrics.TimeseriesServer(runTimeseriesServer) where +import Cardano.Timeseries.AsText +import Cardano.Timeseries.Component +import Cardano.Tracer.Configuration (Certificate (..), Endpoint, TracerConfig (..), + epForceSSL, setEndpoint) +import Cardano.Tracer.Handlers.Metrics.Utils (contentHdrUtf8Text) +import Cardano.Tracer.MetaTrace +import Cardano.Tracer.Time (getTimeMs) + +import qualified Data.ByteString.Lazy as BL +import Data.Maybe (fromMaybe) +import Data.Text (Text) +import qualified Data.Text as Text +import Data.Text.Encoding (decodeUtf8Lenient) +import qualified Data.Text.Encoding as T +import Network.HTTP.Types +import Network.Wai +import Network.Wai.Handler.Warp hiding (run) +import Network.Wai.Handler.WarpTLS +import System.Time.Extra (sleep) + +ok :: Response +ok = responseLBS status204 [] "" + +malformed :: Response +malformed = responseLBS status400 contentHdrUtf8Text "Malformed input" + +notFound :: Response +notFound = responseLBS status404 [] "" + +expectEmptyQuery :: Request + -> (Response -> IO ResponseReceived) + -> IO ResponseReceived + -> IO ResponseReceived +expectEmptyQuery request send kont = + case queryToQueryText request.queryString of + [] -> kont + _ -> send malformed + +expectOneOptionalItemQuery :: Read item + => Request + -> (Response -> IO ResponseReceived) + -> Text + -> (Maybe item -> IO ResponseReceived) + -> IO ResponseReceived +expectOneOptionalItemQuery request send key kont = + case queryToQueryText request.queryString of + [(key', Just (read . Text.unpack -> Just !v))] | key' == key -> kont (Just v) + [] -> kont Nothing + _ -> send malformed + +expectOneItemQuery :: Read item + => Request + -> (Response -> IO ResponseReceived) + -> Text + -> (item -> IO ResponseReceived) + -> IO ResponseReceived +expectOneItemQuery request send key kont = + case queryToQueryText request.queryString of + [(key', Just !(read . Text.unpack -> Just !v))] | key' == key -> kont v + _ -> send malformed + +encodeUtf8 :: Text -> BL.ByteString +encodeUtf8 = BL.fromStrict . T.encodeUtf8 + +timeseriesApp :: TimeseriesHandle -> Application +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "query"] + && request.requestMethod == methodPost = + expectEmptyQuery request send $ do + bs <- consumeRequestBodyStrict request + let query = decodeUtf8Lenient (BL.toStrict bs) + at <- getTimeMs + execute handle (fromIntegral at) query >>= \case + Left err -> send $ + responseLBS status400 contentHdrUtf8Text (encodeUtf8 (asText err)) + where + Right v -> send $ responseLBS status200 contentHdrUtf8Text (encodeUtf8 (showT v)) +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "prune"] + && request.requestMethod == methodPost = + expectEmptyQuery request send $ do + prune handle + send ok +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "config", "retention"] + && request.requestMethod == methodPost = do + expectOneItemQuery request send "value" $ \v -> do + modifyConfig handle (\cfg -> Just cfg{retentionMillis = v}) + send ok +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "config", "pruning"] + && request.requestMethod == methodPost = do + expectOneOptionalItemQuery request send "value" \v -> do + modifyConfig handle (\cfg -> Just cfg{pruningPeriodSec = v}) + send ok +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "config", "retention"] + && request.requestMethod == methodGet = + expectEmptyQuery request send $ do + v <- (.retentionMillis) <$> readConfig handle + send $ responseLBS status200 contentHdrUtf8Text (encodeUtf8 (showT v)) +timeseriesApp handle request send | + request.pathInfo == ["timeseries", "config", "pruning"] + && request.requestMethod == methodGet = + expectEmptyQuery request send $ do + v <- (.pruningPeriodSec) <$> readConfig handle + send $ responseLBS status200 contentHdrUtf8Text (encodeUtf8 (showT v)) +timeseriesApp _ _ send = send notFound + +runTimeseriesServer :: Trace IO TracerTrace -> TracerConfig -> Endpoint -> TimeseriesHandle -> IO () +runTimeseriesServer tr tracerConfig endpoint handle = do + + -- Pause to prevent collision between "Listening"-notifications from servers. + sleep 0.1 + + traceWith tr TracerStartedTimeseries + { ttTimeseriesEndpoint = endpoint + } + + + let + settings :: Settings + settings = setEndpoint endpoint defaultSettings + + tls_settings :: Certificate -> TLSSettings + tls_settings Certificate {..} = + tlsSettingsChain certificateFile (fromMaybe [] certificateChain) certificateKeyFile + + application :: Application + application = timeseriesApp handle + + run :: IO () + run | Just True <- epForceSSL endpoint , Just cert <- tlsCertificate tracerConfig + = runTLS (tls_settings cert) settings application + -- Trace, if we expect SSL without getting certificates. + | Just True <- epForceSSL endpoint + = do traceWith tr TracerMissingCertificate + { ttMissingCertificateEndpoint = endpoint } + runSettings settings application + | otherwise + = runSettings settings application + run diff --git a/cardano-tracer/src/Cardano/Tracer/MetaTrace.hs b/cardano-tracer/src/Cardano/Tracer/MetaTrace.hs index b40195fa228..c4f87f90a1a 100644 --- a/cardano-tracer/src/Cardano/Tracer/MetaTrace.hs +++ b/cardano-tracer/src/Cardano/Tracer/MetaTrace.hs @@ -21,11 +21,13 @@ module Cardano.Tracer.MetaTrace import Cardano.Logging import Cardano.Logging.Resources +import Cardano.Timeseries.Component.Trace (TimeseriesTrace) import Cardano.Tracer.Configuration import Cardano.Tracer.Types (NodeId (..), NodeName) import Data.Aeson hiding (Error) import qualified Data.Aeson as AE +import Data.Functor (($>), (<&>)) import qualified Data.Map.Strict as Map import Data.Text as T (Text, pack) import qualified System.IO as Sys @@ -58,6 +60,9 @@ data TracerTrace | TracerStartedPrometheus { ttPrometheusEndpoint :: Endpoint } + | TracerStartedTimeseries + { ttTimeseriesEndpoint :: Endpoint + } | TracerStartedMonitoring { ttMonitoringEndpoint :: Endpoint , ttMonitoringType :: Text @@ -90,6 +95,13 @@ data TracerTrace } deriving Show +-- | A bundle of domain-split tracers used in the application. +data TraceBundle = TraceBundle{ + -- | A tracer used to trace all kinds of things happening in the application. + assorted :: !(Trace IO TracerTrace), + -- | A tracer that has to do only with timeseries storing/querying/pruning etc. + timeseries :: !(Trace IO TimeseriesTrace) +} instance LogFormatting TracerTrace where forHuman t@TracerConfigIs{ttWarnRTViewMissing = True} = @@ -136,6 +148,10 @@ instance LogFormatting TracerTrace where [ "kind" .= AE.String "TracerStartedPrometheus" , "endpoint" .= ttPrometheusEndpoint ] + TracerStartedTimeseries{..} -> mconcat + [ "kind" .= AE.String "TracerStartedTimeseries" + , "endpoint" .= ttTimeseriesEndpoint + ] TracerStartedMonitoring{..} -> mconcat [ "kind" .= AE.String "TracerStartedMonitoring" , "endpoint" .= ttMonitoringEndpoint @@ -202,6 +218,7 @@ instance MetaTrace TracerTrace where namespaceFor TracerAddNewNodeIdMapping {} = Namespace [] ["AddNewNodeIdMapping"] namespaceFor TracerStartedLogRotator = Namespace [] ["StartedLogRotator"] namespaceFor TracerStartedPrometheus{} = Namespace [] ["StartedPrometheus"] + namespaceFor TracerStartedTimeseries{} = Namespace [] ["StartedTimeseriers"] namespaceFor TracerStartedMonitoring{} = Namespace [] ["StartedMonitoring"] namespaceFor TracerStartedAcceptors {} = Namespace [] ["StartedAcceptors"] namespaceFor TracerStartedRTView = Namespace [] ["StartedRTView"] @@ -227,6 +244,7 @@ instance MetaTrace TracerTrace where severityFor (Namespace _ ["AddNewNodeIdMapping"]) _ = Just Info severityFor (Namespace _ ["StartedLogRotator"]) _ = Just Info severityFor (Namespace _ ["StartedPrometheus"]) _ = Just Info + severityFor (Namespace _ ["StartedTimeseries"]) _ = Just Info severityFor (Namespace _ ["StartedMonitoring"]) _ = Just Info severityFor (Namespace _ ["StartedAcceptors"]) _ = Just Info severityFor (Namespace _ ["StartedRTView"]) _ = Just Info @@ -256,6 +274,7 @@ instance MetaTrace TracerTrace where , Namespace [] ["AddNewNodeIdMapping"] , Namespace [] ["StartedLogRotator"] , Namespace [] ["StartedPrometheus"] + , Namespace [] ["StartedTimeseries"] , Namespace [] ["StartedMonitoring"] , Namespace [] ["StartedAcceptors"] , Namespace [] ["StartedRTView"] @@ -273,34 +292,56 @@ instance MetaTrace TracerTrace where , Namespace [] ["ForwardingInterrupted"] ] -stderrShowTracer :: Trace IO TracerTrace +stderrShowTracer :: Show a => Trace IO a stderrShowTracer = contramapM' (either (const $ pure ()) (Sys.hPrint Sys.stderr) . snd) -mkTracerTracer :: SeverityF -> IO (Trace IO TracerTrace) -mkTracerTracer defSeverity = do - standardTracer - >>= machineFormatter +mkTracerTracer :: Trace IO FormattedMessage -> SeverityF -> IO (Trace IO TracerTrace) +mkTracerTracer std defSeverity = + machineFormatter std >>= filterSeverityFromConfig >>= \t -> let finalTracer = withNames ["Tracer"] (withSeverity t) - in configTracerTracer defSeverity finalTracer >> pure finalTracer + in configTracerTracer finalTracer $> finalTracer + where + configTracerTracer :: Trace IO TracerTrace -> IO () + configTracerTracer tr = do + configReflection <- emptyConfigReflection + configureTracers configReflection initialTraceConfig [tr] + where + initialTraceConfig :: TraceConfig + initialTraceConfig = + TraceConfig + { tcForwarder = Nothing + , tcNodeName = Nothing + , tcResourceFrequency = Nothing + , tcLedgerMetricsFrequency = Nothing + , tcMetricsPrefix = Nothing + , tcOptions = Map.fromList + [ ([], [ConfSeverity defSeverity]) + , (["Tracer"], [ConfDetail DMaximum]) + ] + } -configTracerTracer :: SeverityF -> Trace IO TracerTrace -> IO () -configTracerTracer defSeverity tr = do - configReflection <- emptyConfigReflection - configureTracers configReflection initialTraceConfig [tr] +mkTimeseriesTracer :: Trace IO FormattedMessage -> IO (Trace IO TimeseriesTrace) +mkTimeseriesTracer std = do + tr <- machineFormatter std >>= filterSeverityFromConfig <&> withNames ["Tracer"] . withSeverity + configReflection <- emptyConfigReflection + configureTracers configReflection cfg [tr] + pure tr where - initialTraceConfig :: TraceConfig - initialTraceConfig = + cfg :: TraceConfig + cfg = TraceConfig - { tcForwarder = Nothing - , tcNodeName = Nothing - , tcResourceFrequency = Nothing + { tcForwarder = Nothing + , tcNodeName = Nothing + , tcResourceFrequency = Nothing , tcLedgerMetricsFrequency = Nothing - , tcMetricsPrefix = Nothing - , tcOptions = Map.fromList - [ ([], [ConfSeverity defSeverity]) - , (["Tracer"], [ConfDetail DMaximum]) - ] + , tcMetricsPrefix = Nothing + , tcOptions = Map.fromList [([], [ConfSeverity (SeverityF (Just Info))])] } + +mkTraceBundle :: SeverityF -> IO TraceBundle +mkTraceBundle sev = do + std <- standardTracer + TraceBundle <$> mkTracerTracer std sev <*> mkTimeseriesTracer std diff --git a/cardano-tracer/src/Cardano/Tracer/Run.hs b/cardano-tracer/src/Cardano/Tracer/Run.hs index 712ce1224ea..aa4e15351bf 100644 --- a/cardano-tracer/src/Cardano/Tracer/Run.hs +++ b/cardano-tracer/src/Cardano/Tracer/Run.hs @@ -2,6 +2,8 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE OverloadedRecordDot #-} -- | This top-level module is used by 'cardano-tracer' app. module Cardano.Tracer.Run @@ -25,6 +27,8 @@ import Cardano.Tracer.Handlers.RTView.Run import Cardano.Tracer.MetaTrace import Cardano.Tracer.Types import Cardano.Tracer.Utils +import Cardano.Timeseries.API (Tree) +import qualified Cardano.Timeseries.Component as Timeseries import Control.Applicative import Control.Concurrent (threadDelay) @@ -37,6 +41,7 @@ import Control.Exception (SomeException, try) import Control.Monad import Data.Aeson (decodeFileStrict') import Data.Foldable (for_) +import Data.Traversable (for) import Data.Maybe (fromMaybe) import qualified Data.Map.Strict as M (Map, empty, filter, toList) import Data.Text as T (Text, null) @@ -46,22 +51,22 @@ import Data.Text.Lazy.Builder as TB (Builder, fromText) -- | Top-level run function, called by 'cardano-tracer' app. runCardanoTracer :: TracerParams -> IO () runCardanoTracer TracerParams{tracerConfig, stateDir, logSeverity} = do - tr <- mkTracerTracer $ SeverityF $ logSeverity <|> Just Info -- default severity filter to Info - traceWith tr TracerBuildInfo + tr <- mkTraceBundle $ SeverityF $ logSeverity <|> Just Info -- default severity filter to Info + traceWith tr.assorted TracerBuildInfo #if RTVIEW { ttBuiltWithRTView = True #else { ttBuiltWithRTView = False #endif } - traceWith tr TracerParamsAre + traceWith tr.assorted TracerParamsAre { ttConfigPath = tracerConfig , ttStateDir = stateDir , ttMinLogSeverity = logSeverity } config <- readTracerConfig tracerConfig - traceWith tr TracerConfigIs + traceWith tr.assorted TracerConfigIs { ttConfig = config #if RTVIEW , ttWarnRTViewMissing = False @@ -75,7 +80,7 @@ runCardanoTracer TracerParams{tracerConfig, stateDir, logSeverity} = do forever do mbrs <- readResourceStats for_ mbrs \resourceStat -> - traceWith tr (TracerResource resourceStat) + traceWith tr.assorted (TracerResource resourceStat) threadDelay (1_000 * msInterval) -- Delay in seconds, given milliseconds link threadId @@ -87,12 +92,12 @@ runCardanoTracer TracerParams{tracerConfig, stateDir, logSeverity} = do doRunCardanoTracer :: TracerConfig -- ^ Tracer's configuration. -> Maybe FilePath -- ^ Path to RTView's internal state files. - -> Trace IO TracerTrace + -> TraceBundle -> ProtocolsBrake -- ^ The flag we use to stop all the protocols. -> DataPointRequestors -- ^ The DataPointRequestors to ask 'DataPoint's. -> IO () doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do - traceWith tr TracerInitStarted + traceWith tr.assorted TracerInitStarted connectedNodes <- initConnectedNodes connectedNodesNames <- initConnectedNodesNames acceptedMetrics <- initAcceptedMetrics @@ -109,16 +114,18 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do currentLogLock <- newLock currentDPLock <- newLock - traceWith tr TracerInitEventQueues + traceWith tr.assorted TracerInitEventQueues #if RTVIEW eventsQueues <- initEventsQueues tr rtViewStateDir connectedNodesNames dpRequestors currentDPLock rtViewPageOpened <- newTVarIO False #endif - (reforwardTraceObject,_trDataPoint) <- initReForwarder config tr + (reforwardTraceObject,_trDataPoint) <- initReForwarder config tr.assorted registry <- newRegistry + !timeseriesHandle <- for (hasTimeseries config) (const $ Timeseries.create @(Tree _) tr.timeseries Nothing) + -- Environment for all following functions. let tracerEnv :: TracerEnv tracerEnv = TracerEnv @@ -130,11 +137,12 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do , teCurrentDPLock = currentDPLock , teDPRequestors = dpRequestors , teProtocolsBrake = protocolsBrake - , teTracer = tr + , teTracer = tr.assorted , teReforwardTraceObjects = reforwardTraceObject , teRegistry = registry , teStateDir = rtViewStateDir , teMetricsHelp = mHelp + , teTimeseriesHandle = timeseriesHandle } tracerEnvRTView :: TracerEnvRTView @@ -151,15 +159,15 @@ doRunCardanoTracer config rtViewStateDir tr protocolsBrake dpRequestors = do -- Specify what should be done before 'cardano-tracer' stops. beforeProgramStops $ do - traceWith tr TracerShutdownInitiated + traceWith tr.assorted TracerShutdownInitiated #if RTVIEW backupAllHistory tracerEnv tracerEnvRTView traceWith tr TracerShutdownHistBackup #endif applyBrake (teProtocolsBrake tracerEnv) - traceWith tr TracerShutdownComplete + traceWith tr.assorted TracerShutdownComplete - traceWith tr TracerInitDone + traceWith tr.assorted TracerInitDone sequenceConcurrently_ [ runLogsRotator tracerEnv , runMetricsServers tracerEnv diff --git a/cardano-tracer/src/Cardano/Tracer/Time.hs b/cardano-tracer/src/Cardano/Tracer/Time.hs new file mode 100644 index 00000000000..95d2078f657 --- /dev/null +++ b/cardano-tracer/src/Cardano/Tracer/Time.hs @@ -0,0 +1,12 @@ +module Cardano.Tracer.Time(getTimeMs) where +import Data.Int (Int64) +import Data.Time.Clock.POSIX (getPOSIXTime) + +-- forkServer definition of `getTimeMs'. The ekg frontend relies +-- on the "ekg.server_timestamp_ms" metric being in every +-- store. While forkServer adds that that automatically we must +-- manually add it. +-- url +-- + https://github.com/tvh/ekg-wai/blob/master/System/Remote/Monitoring/Wai.hs#L237-L238 +getTimeMs :: IO Int64 +getTimeMs = (round . (* 1000)) `fmap` getPOSIXTime diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs b/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs index d7b08e1ee58..ae5873a97b7 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Acceptor.hs @@ -33,6 +33,7 @@ import qualified Data.Text as T import System.Time.Extra (sleep) import Trace.Forward.Utils.DataPoint +import Cardano.Logging (standardTracer) data AcceptorsMode = Initiator | Responder @@ -53,7 +54,8 @@ launchAcceptorsSimple mode localSock dpName = do currentLogLock <- newLock currentDPLock <- newLock - tr <- mkTracerTracer $ SeverityF $ Just Warning + std <- standardTracer + tr <- mkTracerTracer std $ SeverityF $ Just Warning #if RTVIEW eventsQueues <- initEventsQueues tr Nothing connectedNodesNames dpRequestors currentDPLock @@ -82,6 +84,7 @@ launchAcceptorsSimple mode localSock dpName = do , teRegistry = registry , teStateDir = Nothing , teMetricsHelp = [] + , teTimeseriesHandle = Nothing } tracerEnvRTView :: TracerEnvRTView @@ -111,6 +114,7 @@ launchAcceptorsSimple mode localSock dpName = do , hasEKG = Nothing , hasPrometheus = Nothing , hasRTView = Nothing + , hasTimeseries = Nothing , tlsCertificate = Nothing , logging = NE.fromList [LoggingParams "/tmp/demo-acceptor" FileMode ForHuman] , rotation = Nothing diff --git a/cardano-tracer/test/Cardano/Tracer/Test/DataPoint/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/DataPoint/Tests.hs index 8abb7a2d72c..6ab547fa798 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/DataPoint/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/DataPoint/Tests.hs @@ -39,7 +39,8 @@ propDataPoint ts@TestSetup{..} rootDir localSock = do stopProtocols <- initProtocolsBrake dpRequestors <- initDataPointRequestors savedDPValues :: TVar DataPointValues <- newTVarIO [] - withAsync (doRunCardanoTracer config (Just $ rootDir <> "/../state") stderrShowTracer stopProtocols dpRequestors) \_ -> do + withAsync (doRunCardanoTracer config (Just $ rootDir <> "/../state") + (TraceBundle stderrShowTracer stderrShowTracer) stopProtocols dpRequestors) \_ -> do sleep 1.0 withAsync (launchForwardersSimple ts Initiator (Net.LocalPipe localSock) 10000) \_ -> do sleep 1.5 @@ -88,6 +89,7 @@ propDataPoint ts@TestSetup{..} rootDir localSock = do , hasEKG = Nothing , hasPrometheus = Nothing , hasRTView = Nothing + , hasTimeseries = Nothing , tlsCertificate = Nothing , logging = NE.fromList [LoggingParams rootDir FileMode ForHuman] , rotation = Nothing diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs index 72d597fce35..4b3b70f8f98 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Logs/Tests.hs @@ -48,7 +48,9 @@ propLogs ts@TestSetup{..} format logRotLimitBytes logRotMaxAgeMinutes rootDir lo lock <- newLock stopProtocols <- initProtocolsBrake dpRequestors <- initDataPointRequestors - withAsync (doRunCardanoTracer (acceptConfig rootDir) (Just $ rootDir <> "/../state") stderrShowTracer stopProtocols dpRequestors) \async1 -> do + withAsync (doRunCardanoTracer (acceptConfig rootDir) + (Just $ rootDir <> "/../state") + (TraceBundle stderrShowTracer stderrShowTracer) stopProtocols dpRequestors) \async1 -> do link async1 sleep 1.0 withAsync (launchForwardersSimple ts Initiator (Net.LocalPipe localSock) 10000) \async2 -> do @@ -68,6 +70,7 @@ propLogs ts@TestSetup{..} format logRotLimitBytes logRotMaxAgeMinutes rootDir lo , ekgRequestFreq = Just 1.0 , hasEKG = Nothing , hasPrometheus = Nothing + , hasTimeseries = Nothing , hasRTView = Nothing , logging = LoggingParams root FileMode format :| [] , rotation = Just $ RotationParams @@ -91,7 +94,9 @@ propMultiInit ts@TestSetup{..} format rootDir howToConnect1 howToConnect2 = do lock <- newLock stopProtocols <- initProtocolsBrake dpRequestors <- initDataPointRequestors - withAsync (doRunCardanoTracer initConfig (Just $ rootDir <> "/../state") stderrShowTracer stopProtocols dpRequestors) \async1 -> do + withAsync (doRunCardanoTracer initConfig + (Just $ rootDir <> "/../state") + (TraceBundle stderrShowTracer stderrShowTracer) stopProtocols dpRequestors) \async1 -> do link async1 sleep 1.0 withAsync (launchForwardersSimple ts Responder howToConnect1 10000) \async2 -> do @@ -115,6 +120,7 @@ propMultiInit ts@TestSetup{..} format rootDir howToConnect1 howToConnect2 = do , hasEKG = Nothing , hasPrometheus = Nothing , hasRTView = Nothing + , hasTimeseries = Nothing , tlsCertificate = Nothing , logging = LoggingParams rootDir FileMode format :| [] , rotation = Nothing @@ -133,7 +139,9 @@ propMultiResp ts@TestSetup{..} format rootDir howToConnect = do lock <- newLock stopProtocols <- initProtocolsBrake dpRequestors <- initDataPointRequestors - withAsync (doRunCardanoTracer respConfig (Just $ rootDir <> "/../state") stderrShowTracer stopProtocols dpRequestors) \async1 -> do + withAsync (doRunCardanoTracer respConfig + (Just $ rootDir <> "/../state") + (TraceBundle stderrShowTracer stderrShowTracer) stopProtocols dpRequestors) \async1 -> do link async1 sleep 1.0 -- withAsync (launchForwardersSimple ts Initiator howToConnect 10000) \async2 -> do @@ -159,6 +167,7 @@ propMultiResp ts@TestSetup{..} format rootDir howToConnect = do , hasEKG = Nothing , hasPrometheus = Nothing , hasRTView = Nothing + , hasTimeseries = Nothing , tlsCertificate = Nothing , logging = LoggingParams rootDir FileMode format :| [] , rotation = Nothing diff --git a/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs b/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs index 1f586c76e43..827489af9a4 100644 --- a/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs +++ b/cardano-tracer/test/Cardano/Tracer/Test/Restart/Tests.hs @@ -42,7 +42,7 @@ propNetworkForwarder ts rootDir localSock = do dpRequestors <- initDataPointRequestors propNetwork' ts rootDir ( launchForwardersSimple ts Initiator (Net.LocalPipe localSock) 10000 - , doRunCardanoTracer config (Just $ rootDir <> "/../state") stderrShowTracer brake dpRequestors + , doRunCardanoTracer config (Just $ rootDir <> "/../state") (TraceBundle stderrShowTracer stderrShowTracer) brake dpRequestors ) propNetwork' @@ -94,6 +94,7 @@ mkConfig TestSetup{..} rootDir p = TracerConfig , hasEKG = Nothing , hasPrometheus = Nothing , hasRTView = Nothing + , hasTimeseries = Nothing , tlsCertificate = Nothing , logging = NE.fromList [LoggingParams rootDir FileMode ForMachine] , rotation = Nothing diff --git a/nix/nixos/cardano-tracer-service-workbench.nix b/nix/nixos/cardano-tracer-service-workbench.nix index a330c3ff98a..b2f5ec0131e 100644 --- a/nix/nixos/cardano-tracer-service-workbench.nix +++ b/nix/nixos/cardano-tracer-service-workbench.nix @@ -46,6 +46,10 @@ let serviceConfigToJSON = epHost = "127.0.0.1"; epPort = 3200; ## supervisord.portShiftPrometheus } // (cfg.prometheus or {}); + hasTimeseries = { + epHost = "127.0.0.1"; + epPort = 3300; ## supervisord.portShiftPrometheus + } // (cfg.prometheus or {}); # Just an example for metrics compatibility mapping. # An entry means the first entry has the second entry as alias. # The Metrics is then available, both with the original and the mapped name. @@ -80,6 +84,7 @@ in pkgs.commonLib.defServiceModule logRoot = opt str null "Log storage root directory."; rotation = opt attrs {} "Log rotation overrides: see cardano-tracer documentation."; RTView = opt attrs {} "RTView config overrides: see cardano-tracer documentation."; + hasTimeseries = opt attrs {} ""; # FIXME: (@russoul) probably shouldn't be here (provisional) ekgPortBase = opt int 3100 "EKG port base."; ekgRequestFreq = opt int 1 "EKG request frequency"; prometheus = opt attrs {} "Prometheus overrides: see cardano-tracer documentation."; diff --git a/nix/workbench/service/tracer.nix b/nix/workbench/service/tracer.nix index cfbf1c1bec3..9a7e5e5c755 100644 --- a/nix/workbench/service/tracer.nix +++ b/nix/workbench/service/tracer.nix @@ -27,6 +27,10 @@ let networkMagic = profile.genesis.network_magic; configFile = "config.json"; metricsHelp = "../../../cardano-tracer/configuration/metrics_help.json"; + hasTimeseries = { # FIXME: (@russoul) for testing only + epHost = "127.0.0.1"; + epPort = 3300; + }; # Decide where the executable comes from: ######################################### } // optionalAttrs (!backend.useCabalRun) {