From 9e7ab473a079b6a371066e9f8367ef1ce1170c78 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Mon, 30 Dec 2024 18:45:35 -0300 Subject: [PATCH 1/4] Refactor traces to use GenServer instead of Agent --- .../lib/telemetry_api/application.ex | 2 +- .../lib/telemetry_api/trace_store.ex | 36 -- telemetry_api/lib/telemetry_api/traces.ex | 556 +++++++++++------- 3 files changed, 357 insertions(+), 237 deletions(-) delete mode 100644 telemetry_api/lib/telemetry_api/trace_store.ex diff --git a/telemetry_api/lib/telemetry_api/application.ex b/telemetry_api/lib/telemetry_api/application.ex index 5472a0bb01..acebe3b50c 100644 --- a/telemetry_api/lib/telemetry_api/application.ex +++ b/telemetry_api/lib/telemetry_api/application.ex @@ -10,7 +10,7 @@ defmodule TelemetryApi.Application do TelemetryApi.MetricsExporter.setup() children = [ - TraceStore, + TelemetryApi.Traces, TelemetryApiWeb.Telemetry, TelemetryApi.Repo, {DNSCluster, query: Application.get_env(:telemetry_api, :dns_cluster_query) || :ignore}, diff --git a/telemetry_api/lib/telemetry_api/trace_store.ex b/telemetry_api/lib/telemetry_api/trace_store.ex deleted file mode 100644 index 0e0e43ffc7..0000000000 --- a/telemetry_api/lib/telemetry_api/trace_store.ex +++ /dev/null @@ -1,36 +0,0 @@ -defmodule TraceStore do - use Agent - - # Start the agent - def start_link(_opts) do - Agent.start_link(fn -> %{} end, name: __MODULE__) - end - - # Store the trace using the merkle_root as the key - def store_trace(merkle_root, trace) do - Agent.update(__MODULE__, fn state -> - Map.put(state, merkle_root, trace) - end) - end - - # Retrieve the trace by merkle_root - def get_trace(merkle_root) do - Agent.get(__MODULE__, fn state -> - case Map.get(state, merkle_root) do - nil -> - IO.inspect("Context not found for #{merkle_root}") - {:error, :not_found, "Context not found for #{merkle_root}"} - - trace -> - {:ok, trace} - end - end) - end - - # Delete the trace after it's used - def delete_trace(merkle_root) do - Agent.update(__MODULE__, fn state -> - Map.delete(state, merkle_root) - end) - end -end diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index bcea072299..2bd5ca65d4 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -12,10 +12,20 @@ defmodule TelemetryApi.Traces do alias OpenTelemetry.Tracer, as: Tracer alias OpenTelemetry.Ctx, as: Ctx + use GenServer + + ######################################## + ############## PUBLIC API ############## + ######################################## + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + @doc """ Send the trace to OpenTelemetry - This function is responsible for creating a new span and storing the context in the Agent. + This function is responsible for creating a new span and storing the context. ## Examples @@ -24,30 +34,7 @@ defmodule TelemetryApi.Traces do :ok """ def create_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace(merkle_root) do - with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do - aggregator_subspan_ctx = - Tracer.start_span( - "Aggregator", - %{ - attributes: [ - {:merkle_root, merkle_root}, - {:total_stake, total_stake} - ] - } - ) - - Tracer.set_current_span(aggregator_subspan_ctx) - Tracer.add_event("New task event received", []) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) - }) - - :ok - end - end + GenServer.call(__MODULE__, {:create_task_trace, merkle_root}) end @doc """ @@ -61,46 +48,7 @@ defmodule TelemetryApi.Traces do :ok """ def register_operator_response(merkle_root, operator_id) do - with {:ok, operator} <- Operators.get_operator(%{id: operator_id}), - :ok <- validate_operator_registration(operator), - {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - operator_stake = Decimal.new(operator.stake) - new_stake = Decimal.add(trace.current_stake, operator_stake) - new_stake_fraction = Decimal.div(new_stake, trace.total_stake) - operator_stake_fraction = Decimal.div(operator_stake, trace.total_stake) - - Tracer.add_event( - "Operator Response: " <> operator.name, - [ - {:merkle_root, merkle_root}, - {:operator_id, operator_id}, - {:name, operator.name}, - {:address, operator.address}, - {:operator_stake, Decimal.to_string(operator_stake)}, - {:current_stake, Decimal.to_string(new_stake)}, - {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, - {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} - ] - ) - - responses = trace.responses ++ [operator_id] - - TraceStore.store_trace(merkle_root, %{ - trace - | responses: responses, - current_stake: new_stake - }) - - PrometheusMetrics.operator_response( - operator.name <> " - " <> String.slice(operator.address, 0..7) - ) - - IO.inspect( - "Operator response included. merkle_root: #{inspect(merkle_root)} operator_id: #{inspect(operator_id)}" - ) - - :ok - end + GenServer.call(__MODULE__, {:register_operator_response, merkle_root, operator_id}) end @doc """ @@ -114,23 +62,7 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_creation_failed(merkle_root, error) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event( - "Batcher Task Creation Failed", - [ - {:error, error} - ] - ) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) - - :ok - end + GenServer.call(__MODULE__, {:batcher_task_creation_failed, merkle_root, error}) end @doc """ @@ -143,54 +75,7 @@ defmodule TelemetryApi.Traces do :ok """ def create_batcher_task_trace(merkle_root) do - root_span_ctx = - Tracer.start_span( - "Task: #{merkle_root}", - %{ - attributes: [ - {:merkle_root, merkle_root} - ] - } - ) - - {:ok, total_stake} = StakeRegistry.get_current_total_stake() - ctx = Ctx.get_current() - - TraceStore.store_trace(merkle_root, %Trace{ - parent_span: root_span_ctx, - context: ctx, - total_stake: total_stake, - current_stake: 0, - responses: [], - subspans: %{} - }) - - with {:ok, trace} <- set_current_trace(merkle_root) do - # This span ends inmediately after it's created just to set the correct title to the final task. - Tracer.with_span "Task: #{merkle_root}" do - Tracer.set_attributes(%{merkle_root: merkle_root}) - end - - batcher_subspan_ctx = - Tracer.start_span( - "Batcher", - %{ - attributes: [ - {:merkle_root, merkle_root} - ] - } - ) - - Tracer.set_current_span(batcher_subspan_ctx) - Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) - }) - - :ok - end + GenServer.call(__MODULE__, {:create_batcher_task_trace, merkle_root}) end @doc """ @@ -203,10 +88,7 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_uploaded_to_s3(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Uploaded to S3", []) - :ok - end + GenServer.call(__MODULE__, {:batcher_task_uploaded_to_s3, merkle_root}) end @doc """ @@ -220,18 +102,7 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_sent(merkle_root, tx_hash) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Sent to Ethereum", [{"tx_hash", tx_hash}]) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) - - :ok - end + GenServer.call(__MODULE__, {:batcher_task_sent, merkle_root, tx_hash}) end @doc """ @@ -244,16 +115,7 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_started(merkle_root, fee_per_proof, total_proofs) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - IO.inspect("fee_per_proof: #{fee_per_proof}") - - Tracer.add_event("Batcher Task being created", - fee_per_proof: fee_per_proof, - total_proofs: total_proofs - ) - - :ok - end + GenServer.call(__MODULE__, {:batcher_task_started, merkle_root, fee_per_proof, total_proofs}) end @doc """ @@ -266,11 +128,7 @@ defmodule TelemetryApi.Traces do :ok """ def quorum_reached(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Quorum Reached", []) - IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") - :ok - end + GenServer.call(__MODULE__, {:quorum_reached, merkle_root}) end @doc """ @@ -284,18 +142,7 @@ defmodule TelemetryApi.Traces do :ok """ def task_error(merkle_root, error) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event( - "Batch verification failed", - [ - {:status, "error"}, - {:error, error} - ] - ) - - IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}") - :ok - end + GenServer.call(__MODULE__, {:task_error, merkle_root, error}) end @doc """ @@ -309,10 +156,7 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_set_gas_price(merkle_root, gas_price) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Gas price set", [{"gas_price", gas_price}]) - :ok - end + GenServer.call(__MODULE__, {:aggregator_task_set_gas_price, merkle_root, gas_price}) end @doc """ @@ -326,12 +170,10 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}]) - :ok - end + GenServer.call(__MODULE__, {:aggregator_task_sent, merkle_root, tx_hash, effective_gas_price}) end + @doc """ Finish the task trace @@ -344,7 +186,285 @@ defmodule TelemetryApi.Traces do :ok """ def finish_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do + GenServer.call(__MODULE__, {:finish_task_trace, merkle_root}) + end + + + ######################################## + ############### HANDLERS ############### + ######################################## + + @impl true + def init(:ok) do + {:ok, %{}} + end + + @impl true + def handle_call({:create_task_trace, merkle_root}, _from, traces) do + with {:ok, trace} <- set_current_trace(traces, merkle_root), + {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do + aggregator_subspan_ctx = + Tracer.start_span( + "Aggregator", + %{ + attributes: [ + {:merkle_root, merkle_root}, + {:total_stake, total_stake} + ] + } + ) + + Tracer.set_current_span(aggregator_subspan_ctx) + Tracer.add_event("New task event received", []) + + traces = store_trace(traces, merkle_root, %{ + trace + | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) + }) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:register_operator_response, merkle_root, operator_id}, _from, traces) do + with {:ok, operator} <- Operators.get_operator(%{id: operator_id}), + :ok <- validate_operator_registration(operator), + {:ok, trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + + operator_stake = Decimal.new(operator.stake) + new_stake = Decimal.add(trace.current_stake, operator_stake) + new_stake_fraction = Decimal.div(new_stake, trace.total_stake) + operator_stake_fraction = Decimal.div(operator_stake, trace.total_stake) + + Tracer.add_event( + "Operator Response: " <> operator.name, + [ + {:merkle_root, merkle_root}, + {:operator_id, operator_id}, + {:name, operator.name}, + {:address, operator.address}, + {:operator_stake, Decimal.to_string(operator_stake)}, + {:current_stake, Decimal.to_string(new_stake)}, + {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, + {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} + ] + ) + + responses = trace.responses ++ [operator_id] + + traces = store_trace(traces, merkle_root, %{ + trace + | responses: responses, + current_stake: new_stake + }) + + PrometheusMetrics.operator_response( + operator.name <> " - " <> String.slice(operator.address, 0..7) + ) + + IO.inspect( + "Operator response included. merkle_root: #{inspect(merkle_root)} operator_id: #{inspect(operator_id)}" + ) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:batcher_task_creation_failed, merkle_root, error}, _from, traces) do + with {:ok, trace} <- set_current_trace_with_subspan(traces, merkle_root, :batcher) do + Tracer.add_event( + "Batcher Task Creation Failed", + [ + {:error, error} + ] + ) + + Tracer.end_span() + + traces = store_trace(traces, merkle_root, %{ + trace + | subspans: Map.delete(trace.subspans, :batcher) + }) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:create_batcher_task_trace, merkle_root}, _from, traces) do + with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do + root_span_ctx = + Tracer.start_span( + "Task: #{merkle_root}", + %{ + attributes: [ + {:merkle_root, merkle_root} + ] + } + ) + + # IO.inspect("DEBUG: Root span #{root_span_ctx}") + ctx = Ctx.get_current() + # IO.inspect("DEBUG: Current context: #{ctx}") + + traces = store_trace(traces, merkle_root, %Trace{ + parent_span: root_span_ctx, + context: ctx, + total_stake: total_stake, + current_stake: 0, + responses: [], + subspans: %{} + }) + + with {:ok, trace} <- set_current_trace(traces, merkle_root) do + # This span ends inmediately after it's created just to set the correct title to the final task. + Tracer.with_span "Task: #{merkle_root}" do + Tracer.set_attributes(%{merkle_root: merkle_root}) + end + + batcher_subspan_ctx = + Tracer.start_span( + "Batcher", + %{ + attributes: [ + {:merkle_root, merkle_root} + ] + } + ) + + Tracer.set_current_span(batcher_subspan_ctx) + Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) + + traces = store_trace(traces, merkle_root, %{ + trace + | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) + }) + + {:reply, :ok, traces} + end + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:batcher_task_uploaded_to_s3, merkle_root}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :batcher) do + Tracer.add_event("Batcher Task Uploaded to S3", []) + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:batcher_task_sent, merkle_root, tx_hash}, _from, traces) do + with {:ok, trace} <- set_current_trace_with_subspan(traces, merkle_root, :batcher) do + Tracer.add_event("Batcher Task Sent to Ethereum", [{"tx_hash", tx_hash}]) + Tracer.end_span() + traces = store_trace(traces, merkle_root, %{ + trace + | subspans: Map.delete(trace.subspans, :batcher) + }) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + + @impl true + def handle_call({:batcher_task_started, merkle_root, fee_per_proof, total_proofs}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :batcher) do + Tracer.add_event("Batcher Task being created", + fee_per_proof: fee_per_proof, + total_proofs: total_proofs + ) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:quorum_reached, merkle_root}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + Tracer.add_event("Quorum Reached", []) + IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:task_error, merkle_root, error}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + Tracer.add_event( + "Batch verification failed", + [ + {:status, "error"}, + {:error, error} + ] + ) + + IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}") + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:aggregator_task_set_gas_price, merkle_root, gas_price}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + Tracer.add_event("Gas price set", [{"gas_price", gas_price}]) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:aggregator_task_sent, merkle_root, tx_hash, effective_gas_price}, _from, traces) do + with {:ok, _trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}]) + + {:reply, :ok, traces} + else + {:error, information} -> + {:reply, {:error, information}, traces} + end + end + + @impl true + def handle_call({:finish_task_trace, merkle_root}, _from, traces) do + IO.inspect("DEBUG: Finish task trace called with merkle_root #{merkle_root}") + with {:ok, trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do + IO.inspect("DEBUG: Setting current trace for trace of merkle root #{merkle_root}") missing_operators = Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end) @@ -353,19 +473,30 @@ defmodule TelemetryApi.Traces do Tracer.end_span() - with {:ok, _trace} <- set_current_trace(merkle_root) do + # Clean up the context + with {:ok, _trace} <- set_current_trace(traces, merkle_root) do + IO.inspect("Finished task trace with merkle_root: #{merkle_root}.") Tracer.end_span() - TraceStore.delete_trace(merkle_root) - end + traces = delete_trace(traces, merkle_root) - # Clean up the context from the Agent - IO.inspect("Finished task trace with merkle_root: #{merkle_root}.") - :ok + {:reply, :ok, traces} + end + else + {:error, information} -> + IO.inspect("DEBUG: Something failed with error #{information} and merkle root #{merkle_root}") + {:reply, {:error, information}, traces} end end + + ######################################### + ########## AUXILIARY FUNCTIONS ########## + ######################################### + defp add_missing_operators([]), do: :ok + # Updates the missing operator metric and adds a "Missing operator" trace event + # for operators missing on the provided missing_operator parameter defp add_missing_operators(missing_operators) do # Concatenate name + address missing_operators = @@ -382,27 +513,52 @@ defmodule TelemetryApi.Traces do Tracer.add_event("Missing Operators", [{:operators, missing_operators}]) end - defp set_current_trace(merkle_root) do - with {:ok, trace} <- TraceStore.get_trace(merkle_root) do + # Validates that provided operator is registered + defp validate_operator_registration(operator) do + if Operators.is_registered?(operator) do + :ok + else + {:error, :bad_request, "Operator not registered"} + end + end + + # Store the trace using the merkle_root as the key + defp store_trace(traces, merkle_root, trace) do + Map.put(traces, merkle_root, trace) + end + + # Retrieve the trace by merkle_root + defp get_trace(traces, merkle_root) do + case Map.get(traces, merkle_root) do + nil -> + IO.inspect("Context not found for #{merkle_root}") + {:error, :not_found, "Context not found for #{merkle_root}"} + + trace -> + {:ok, trace} + end + end + + # Delete the trace after it's used + defp delete_trace(traces, merkle_root) do + Map.delete(traces, merkle_root) + end + + # Sets the trace corresponding to the provided merkle_root + defp set_current_trace(traces, merkle_root) do + with {:ok, trace} <- get_trace(traces, merkle_root) do Ctx.attach(trace.context) Tracer.set_current_span(trace.parent_span) {:ok, trace} end end - defp set_current_trace_with_subspan(merkle_root, span_name) do - with {:ok, trace} <- TraceStore.get_trace(merkle_root) do + # Sets the trace and subspan corresponding to the provided merkle_root and span_name + defp set_current_trace_with_subspan(traces, merkle_root, span_name) do + with {:ok, trace} <- get_trace(traces, merkle_root) do Ctx.attach(trace.context) Tracer.set_current_span(trace.subspans[span_name]) {:ok, trace} end end - - defp validate_operator_registration(operator) do - if Operators.is_registered?(operator) do - :ok - else - {:error, :bad_request, "Operator not registered"} - end - end end From d1c527fb61d3a529262b7f8c68434e7b6859da29 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Thu, 2 Jan 2025 11:58:50 -0300 Subject: [PATCH 2/4] Remove debug logs, improve error handling --- telemetry_api/lib/telemetry_api/traces.ex | 77 ++++++++++++++++++----- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 2bd5ca65d4..475da790d6 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -224,8 +224,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -272,8 +276,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -296,8 +304,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -314,9 +326,7 @@ defmodule TelemetryApi.Traces do } ) - # IO.inspect("DEBUG: Root span #{root_span_ctx}") ctx = Ctx.get_current() - # IO.inspect("DEBUG: Current context: #{ctx}") traces = store_trace(traces, merkle_root, %Trace{ parent_span: root_span_ctx, @@ -354,8 +364,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} end else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -365,8 +379,12 @@ defmodule TelemetryApi.Traces do Tracer.add_event("Batcher Task Uploaded to S3", []) {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -382,8 +400,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -398,8 +420,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -411,8 +437,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -431,8 +461,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -443,8 +477,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @@ -455,16 +493,18 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end @impl true def handle_call({:finish_task_trace, merkle_root}, _from, traces) do - IO.inspect("DEBUG: Finish task trace called with merkle_root #{merkle_root}") with {:ok, trace} <- set_current_trace_with_subspan(traces, merkle_root, :aggregator) do - IO.inspect("DEBUG: Setting current trace for trace of merkle root #{merkle_root}") missing_operators = Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end) @@ -482,9 +522,12 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} end else - {:error, information} -> - IO.inspect("DEBUG: Something failed with error #{information} and merkle root #{merkle_root}") + {:error, information} -> {:reply, {:error, information}, traces} + {:error, error_type, message} -> + {:reply, {:error, error_type, message}, traces} + unknown -> + {:reply, {:error, unknown}, traces} end end From 97d6a760d018c504a6043dae419cb2dd41c8b8f9 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Thu, 2 Jan 2025 12:01:33 -0300 Subject: [PATCH 3/4] Add context clear when new trace is created --- telemetry_api/lib/telemetry_api/traces.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 475da790d6..83e5622f16 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -316,6 +316,8 @@ defmodule TelemetryApi.Traces do @impl true def handle_call({:create_batcher_task_trace, merkle_root}, _from, traces) do with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do + Ctx.clear() + root_span_ctx = Tracer.start_span( "Task: #{merkle_root}", From 6844b6e2f4a69deefd3e08acb0d0435f559a9f54 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Fri, 3 Jan 2025 12:07:28 -0300 Subject: [PATCH 4/4] Refactor error handling to make use of an auxiliary function --- telemetry_api/lib/telemetry_api/traces.ex | 93 +++++------------------ 1 file changed, 21 insertions(+), 72 deletions(-) diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 83e5622f16..25494d2d68 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -224,12 +224,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -276,12 +271,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -304,12 +294,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -366,12 +351,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} end else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -381,12 +361,7 @@ defmodule TelemetryApi.Traces do Tracer.add_event("Batcher Task Uploaded to S3", []) {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -402,12 +377,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -422,12 +392,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -439,12 +404,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -463,12 +423,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -479,12 +434,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -495,12 +445,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -524,12 +469,7 @@ defmodule TelemetryApi.Traces do {:reply, :ok, traces} end else - {:error, information} -> - {:reply, {:error, information}, traces} - {:error, error_type, message} -> - {:reply, {:error, error_type, message}, traces} - unknown -> - {:reply, {:error, unknown}, traces} + error -> {:reply, handle_error(error), traces} end end @@ -606,4 +546,13 @@ defmodule TelemetryApi.Traces do {:ok, trace} end end + + # Handles different types of errors that may be returned on the GenServer + defp handle_error(error) do + case error do + {:error, information} -> {:error, information} + {:error, error_type, message} -> {:error, error_type, message} + unknown -> {:error, unknown} + end + end end