diff --git a/telemetry_api/lib/telemetry_api/contract_managers/registry_coordinator_manager.ex b/telemetry_api/lib/telemetry_api/contract_managers/registry_coordinator_manager.ex new file mode 100644 index 0000000000..06f7dd65a8 --- /dev/null +++ b/telemetry_api/lib/telemetry_api/contract_managers/registry_coordinator_manager.ex @@ -0,0 +1,40 @@ +defmodule TelemetryApi.ContractManagers.RegistryCoordinatorManager do + alias TelemetryApi.ContractManagers.RegistryCoordinatorManager + + require Logger + + @aligned_config_file System.get_env("ALIGNED_CONFIG_FILE") + + config_file_path = + case @aligned_config_file do + nil -> raise("ALIGNED_CONFIG_FILE not set in .env") + file -> file + end + + {status, config_json_string} = File.read(config_file_path) + + case status do + :ok -> + Logger.debug("Aligned deployment file read successfully") + + :error -> + raise("Config file not read successfully") + end + + @registry_coordinator_address Jason.decode!(config_json_string) + |> Map.get("addresses") + |> Map.get("registryCoordinator") + + use Ethers.Contract, + abi_file: "priv/abi/IRegistryCoordinator.json", + default_address: @registry_coordinator_address + + def get_registry_coordinator_address() do + @registry_coordinator_address + end + + def fetch_operator_status(operator_address) do + RegistryCoordinatorManager.get_operator_status(operator_address) + |> Ethers.call() + end +end diff --git a/telemetry_api/lib/telemetry_api/operators.ex b/telemetry_api/lib/telemetry_api/operators.ex index e5bcbe1a59..c324f93ac1 100644 --- a/telemetry_api/lib/telemetry_api/operators.ex +++ b/telemetry_api/lib/telemetry_api/operators.ex @@ -28,13 +28,13 @@ defmodule TelemetryApi.Operators do ## Examples - iex> get_operator(%Operator{id: some_id}) + iex> get_operator(%{id: some_id}) {:ok, %Operator{}} - iex> get_operator(%Operator{address: some_address}) + iex> get_operator(%{address: some_address}) {:ok, %Operator{}} - iex> get_operator(%Operator{address: non_existent_address}) + iex> get_operator(%{address: non_existent_address}) {:error, :not_found, "Operator not found for address: non_existent_address"} """ def get_operator(%{address: address}) do @@ -188,4 +188,20 @@ defmodule TelemetryApi.Operators do def change_operator(%Operator{} = operator, attrs \\ %{}) do Operator.changeset(operator, attrs) end + + @doc """ + Checks if an operator is registered. + + ## Examples + + iex> is_registered?(%Operator{status: "REGISTERED"}) + true + + iex> is_registered?(%Operator{status: "DEREGISTERED"}) + false + + """ + def is_registered?(operator) do + operator.status == "REGISTERED" + end end diff --git a/telemetry_api/lib/telemetry_api/operators/operator.ex b/telemetry_api/lib/telemetry_api/operators/operator.ex index 6c2892d742..75089f52ae 100644 --- a/telemetry_api/lib/telemetry_api/operators/operator.ex +++ b/telemetry_api/lib/telemetry_api/operators/operator.ex @@ -8,6 +8,7 @@ defmodule TelemetryApi.Operators.Operator do field :stake, :string field :name, :string field :version, :string + field :status, :string timestamps(type: :utc_datetime) end @@ -15,7 +16,7 @@ defmodule TelemetryApi.Operators.Operator do @doc false def changeset(operator, attrs) do operator - |> cast(attrs, [:address, :id, :stake, :name, :version]) + |> cast(attrs, [:address, :id, :stake, :name, :version, :status]) |> validate_required([:address, :id, :name, :stake]) end end diff --git a/telemetry_api/lib/telemetry_api/periodic/operator_fetcher.ex b/telemetry_api/lib/telemetry_api/periodic/operator_fetcher.ex index 285e60ef1e..c9adcd0673 100644 --- a/telemetry_api/lib/telemetry_api/periodic/operator_fetcher.ex +++ b/telemetry_api/lib/telemetry_api/periodic/operator_fetcher.ex @@ -1,6 +1,12 @@ defmodule TelemetryApi.Periodic.OperatorFetcher do use GenServer alias TelemetryApi.Operators + alias TelemetryApi.ContractManagers.RegistryCoordinatorManager + require Logger + + @never_registered 0 + @registered 1 + @deregistered 2 wait_time_str = System.get_env("OPERATOR_FETCHER_WAIT_TIME_MS") || raise """ @@ -24,14 +30,37 @@ defmodule TelemetryApi.Periodic.OperatorFetcher do end def send_work() do - :timer.send_interval(@wait_time_ms, :fetch_operators) + :timer.send_interval(@wait_time_ms, :poll_service) + end + + def handle_info(:poll_service, state) do + fetch_operators_info() + fetch_operators_status() + {:noreply, state} + end + + defp fetch_operators_info() do + case Operators.fetch_all_operators() do + {:ok, _} -> :ok + {:error, message} -> IO.inspect("Couldn't fetch operators: #{IO.inspect(message)}") + end end - def handle_info(:fetch_operators, _state) do - case Operators.fetch_all_operators() do - {:ok, _} -> :ok - {:error, message} -> IO.inspect "Couldn't fetch operators: #{IO.inspect message}" - end - {:noreply, %{}} + defp fetch_operators_status() do + Operators.list_operators() + |> Enum.map(fn op -> + case RegistryCoordinatorManager.fetch_operator_status(op.address) do + {:ok, status} -> + Operators.update_operator(op, %{status: string_status(status)}) + + error -> + Logger.error("Error when updating status: #{error}") + end + end) + :ok end + + defp string_status(@never_registered), do: "NEVER_REGISTERED" + defp string_status(@registered), do: "REGISTERED" + defp string_status(@deregistered), do: "DEREGISTERED" end diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 3d008c8916..8182892c64 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -63,6 +63,7 @@ defmodule TelemetryApi.Traces do """ def register_operator_response(merkle_root, operator_id) do with {:ok, operator} <- Operators.get_operator(%{id: operator_id}), + :ok <- validate_operator_registration(operator), {:ok, trace} <- set_current_trace(merkle_root) do operator_stake = Decimal.new(operator.stake) new_stake = Decimal.add(trace.current_stake, operator_stake) @@ -111,7 +112,7 @@ defmodule TelemetryApi.Traces do def quorum_reached(merkle_root) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.add_event("Quorum Reached", []) - IO.inspect("Reached quorum registered. merkle_root: #{IO.inspect(merkle_root)}") + IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") :ok end end @@ -155,7 +156,7 @@ defmodule TelemetryApi.Traces do def finish_task_trace(merkle_root) do with {:ok, trace} <- set_current_trace(merkle_root) do missing_operators = - Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses end) + Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end) add_missing_operators(missing_operators) @@ -165,7 +166,7 @@ defmodule TelemetryApi.Traces do # Clean up the context from the Agent TraceStore.delete_trace(merkle_root) - IO.inspect("Finished task trace with merkle_root: #{IO.inspect(merkle_root)}.") + IO.inspect("Finished task trace with merkle_root: #{merkle_root}.") :ok end end @@ -186,4 +187,12 @@ defmodule TelemetryApi.Traces do {:ok, trace} end end + + defp validate_operator_registration(operator) do + if Operators.is_registered?(operator) do + :ok + else + {:error, :bad_request, "Operator not registered"} + end + end end diff --git a/telemetry_api/lib/telemetry_api_web/controllers/operator_json.ex b/telemetry_api/lib/telemetry_api_web/controllers/operator_json.ex index f267d1788e..c3e46c251c 100644 --- a/telemetry_api/lib/telemetry_api_web/controllers/operator_json.ex +++ b/telemetry_api/lib/telemetry_api_web/controllers/operator_json.ex @@ -21,7 +21,8 @@ defmodule TelemetryApiWeb.OperatorJSON do id: operator.id, stake: operator.stake, name: operator.name, - version: operator.version + version: operator.version, + status: operator.status } end end diff --git a/telemetry_api/priv/repo/migrations/20241003205454_add_operator_status.exs b/telemetry_api/priv/repo/migrations/20241003205454_add_operator_status.exs new file mode 100644 index 0000000000..f60388097a --- /dev/null +++ b/telemetry_api/priv/repo/migrations/20241003205454_add_operator_status.exs @@ -0,0 +1,9 @@ +defmodule TelemetryApi.Repo.Migrations.AddOperatorStatus do + use Ecto.Migration + + def change do + alter table(:operators) do + add :status, :string + end + end +end