diff --git a/docs/conf.py b/docs/conf.py index b4e57200fb..df62d5c15a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -160,6 +160,10 @@ "py:class", "opamp_pb2.EffectiveConfig", ), + ( + "py:class", + "opamp_pb2.AgentRemoteConfig", + ), ] cfg = ConfigParser() diff --git a/opamp/opentelemetry-opamp-client/CHANGELOG.md b/opamp/opentelemetry-opamp-client/CHANGELOG.md index 6e31236706..f981e2dcf3 100644 --- a/opamp/opentelemetry-opamp-client/CHANGELOG.md +++ b/opamp/opentelemetry-opamp-client/CHANGELOG.md @@ -9,3 +9,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Initial implementation ([#3635](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3635)) +- Update client to have additional callback methods + ([#4322](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4322)) diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py index 571a9837a3..ed85131f04 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/__init__.py @@ -35,7 +35,7 @@ Since OpAMP APIs, config options or environment variables are not standardizes the distros are required to provide code doing so. -OTel Python distros would need to provide their own message handler callback that implements the actual +OTel Python distros would need to provide their own Callbacks subclass that implements the actual change of whatever configuration their backends sends. Please note that the API is not finalized yet and so the name is called ``_opamp`` with the underscore. @@ -48,15 +48,18 @@ import os from opentelemetry._opamp.agent import OpAMPAgent + from opentelemetry._opamp.callbacks import Callbacks from opentelemetry._opamp.client import OpAMPClient - from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 from opentelemetry.sdk._configuration import _OTelSDKConfigurator from opentelemetry.sdk.resources import OTELResourceDetector - def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.ServerToAgent): - for config_filename, config in message.remote_config.config.config_map.items(): - print("do something") + class MyCallbacks(Callbacks): + def on_message(self, agent, client, message): + if message.remote_config is None: + return + for config_filename, config in message.remote_config.config.config_map.items(): + print("do something") class MyOpenTelemetryConfigurator(_OTelSDKConfigurator): @@ -79,7 +82,7 @@ def _configure(self, **kwargs): ) opamp_agent = OpAMPAgent( interval=30, - message_handler=opamp_handler, + callbacks=MyCallbacks(), client=opamp_client, ) opamp_agent.start() @@ -90,6 +93,7 @@ def _configure(self, **kwargs): """ from opentelemetry._opamp.agent import OpAMPAgent +from opentelemetry._opamp.callbacks import Callbacks, MessageData from opentelemetry._opamp.client import OpAMPClient -__all__ = ["OpAMPAgent", "OpAMPClient"] +__all__ = ["Callbacks", "MessageData", "OpAMPAgent", "OpAMPClient"] diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py index fe547c81c9..2616726fcc 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/agent.py @@ -21,12 +21,24 @@ import threading from typing import Any, Callable +from opentelemetry._opamp.callbacks import Callbacks, MessageData from opentelemetry._opamp.client import OpAMPClient from opentelemetry._opamp.proto import opamp_pb2 logger = logging.getLogger(__name__) +def _safe_invoke(function: Callable[..., Any], *args: Any) -> None: + function_name = "" + try: + function_name = function.__name__ + function(*args) + except Exception as exc: # pylint: disable=broad-exception-caught + logger.error( + "Error when invoking function '%s'", function_name, exc_info=exc + ) + + class _Job: """ Represents a single request job, with retry/backoff metadata. @@ -73,9 +85,7 @@ def __init__( self, *, interval: float = 30, - message_handler: Callable[ - ["OpAMPAgent", OpAMPClient, opamp_pb2.ServerToAgent], None - ], + callbacks: Callbacks, max_retries: int = 10, heartbeat_max_retries: int = 1, initial_backoff: float = 1.0, @@ -83,14 +93,14 @@ def __init__( ): """ :param interval: seconds between heartbeat calls - :param message_handler: user provided function that takes the received ServerToAgent message + :param callbacks: Callbacks instance for receiving client events :param max_retries: how many times to retry a failed job for ad-hoc messages :param heartbeat_max_retries: how many times to retry an heartbeat failed job :param initial_backoff: base seconds for exponential backoff :param client: an OpAMPClient instance """ self._interval = interval - self._handler = message_handler + self._callbacks = callbacks self._max_retries = max_retries self._heartbeat_max_retries = heartbeat_max_retries self._initial_backoff = initial_backoff @@ -186,15 +196,24 @@ def _run_worker(self) -> None: while job.should_retry() and not self._stop.is_set(): try: message = self._client.send(job.payload) + _safe_invoke( + self._callbacks.on_connect, self, self._client + ) logger.debug("Job succeeded: %r", job.payload) break except Exception as exc: job.attempt += 1 + _safe_invoke( + self._callbacks.on_connect_failed, + self, + self._client, + exc, + ) logger.warning( "Job %r failed attempt %d/%d: %s", job.payload, job.attempt, - job.max_retries, + job.max_retries + 1, exc, ) @@ -202,7 +221,6 @@ def _run_worker(self) -> None: logger.error( "Job %r dropped after max retries", job.payload ) - logger.exception(exc) break # exponential backoff with +/- 20% jitter, interruptible by stop event @@ -216,14 +234,7 @@ def _run_worker(self) -> None: break if message is not None: - # we can't do much if the handler fails other than logging - try: - self._handler(self, self._client, message) - logger.debug("Called Job message handler for: %r", message) - except Exception as exc: - logger.warning( - "Job %r handler failed with: %s", job.payload, exc - ) + self._process_message(message) try: if job.callback is not None: @@ -233,6 +244,29 @@ def _run_worker(self) -> None: finally: self._queue.task_done() + def _process_message(self, message: opamp_pb2.ServerToAgent) -> None: + if message.HasField("error_response"): + _safe_invoke( + self._callbacks.on_error, + self, + self._client, + message.error_response, + ) + return + + if message.flags & opamp_pb2.ServerToAgentFlags_ReportFullState: + logger.debug("Server requested full state report") + payload = self._client.build_full_state_message() + self.send(payload) + + msg_data = MessageData.from_server_message(message) + _safe_invoke( + self._callbacks.on_message, + self, + self._client, + msg_data, + ) + def stop(self, timeout: float | None = None) -> None: """ Signal server we are disconnecting and then threads to exit diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py new file mode 100644 index 0000000000..78bffca66b --- /dev/null +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/callbacks.py @@ -0,0 +1,94 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from opentelemetry._opamp.proto import opamp_pb2 + +if TYPE_CHECKING: + from opentelemetry._opamp.agent import OpAMPAgent + from opentelemetry._opamp.client import OpAMPClient + + +@dataclass +class MessageData: + """Structured view of a ServerToAgent message for callback consumption. + + Only fields the agent is expected to act on are exposed. Flags and + error_response are handled internally by the client before this + object reaches the callback. + """ + + remote_config: opamp_pb2.AgentRemoteConfig | None = None + + @classmethod + def from_server_message( + cls, message: opamp_pb2.ServerToAgent + ) -> MessageData: + return cls( + remote_config=message.remote_config + if message.HasField("remote_config") + else None, + ) + + +class Callbacks(ABC): + """OpAMP client callbacks with no-op defaults. + + All methods have no-op defaults so that subclasses only need to + override the callbacks they care about. New callbacks can be added + in the future without breaking existing subclasses. + """ + + def on_connect(self, agent: OpAMPAgent, client: OpAMPClient) -> None: + """Called when the connection is successfully established to the + Server. For HTTP clients this is called for any request if the + response status is OK. + """ + + def on_connect_failed( + self, + agent: OpAMPAgent, + client: OpAMPClient, + error: Exception, + ) -> None: + """Called when the connection to the Server cannot be established. + May also be called if the connection is lost and reconnection + attempt fails. + """ + + def on_error( + self, + agent: OpAMPAgent, + client: OpAMPClient, + error_response: opamp_pb2.ServerErrorResponse, + ) -> None: + """Called when the Server reports an error in response to a + previously sent request. Useful for logging purposes. The Agent + should not attempt to process the error by reconnecting or + retrying previous operations. The client handles the UNAVAILABLE + case internally by performing retries as necessary. + """ + + def on_message( + self, + agent: OpAMPAgent, + client: OpAMPClient, + message: MessageData, + ) -> None: + """Called when the Agent receives a message that needs processing.""" diff --git a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/transport/requests.py b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/transport/requests.py index 974da6906b..6c29a22d23 100644 --- a/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/transport/requests.py +++ b/opamp/opentelemetry-opamp-client/src/opentelemetry/_opamp/transport/requests.py @@ -59,8 +59,7 @@ def send( ) response.raise_for_status() except Exception as exc: - logger.error(str(exc)) - raise OpAMPException + raise OpAMPException(str(exc)) from exc message = messages.decode_message(response.content) diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml b/opamp/opentelemetry-opamp-client/tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml index 74db5d2d22..df254d3714 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml +++ b/opamp/opentelemetry-opamp-client/tests/opamp/cassettes/test_connection_remote_config_status_heartbeat_disconnection.yaml @@ -56,10 +56,10 @@ interactions: body: string: !!binary | ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ - G3hSuFUwAVIA + G3hSuFVSAA== headers: Content-Length: - - '66' + - '64' Content-Type: - application/x-protobuf Date: @@ -89,10 +89,10 @@ interactions: body: string: !!binary | ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ - G3hSuFUwAVIA + G3hSuFVSAA== headers: Content-Length: - - '66' + - '64' Content-Type: - application/x-protobuf Date: @@ -122,10 +122,10 @@ interactions: body: string: !!binary | ChABnL5b5k5046/FazbAJ7r4GioKBgoECgASABIg47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZ - G3hSuFUwAVIA + G3hSuFVSAA== headers: Content-Length: - - '66' + - '64' Content-Type: - application/x-protobuf Date: diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py index 9582151176..9c7e7f34f0 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_agent.py @@ -16,20 +16,26 @@ from time import sleep from unittest import mock -from opentelemetry._opamp.agent import OpAMPAgent +from opentelemetry._opamp.agent import OpAMPAgent, _safe_invoke from opentelemetry._opamp.agent import _Job as Job +from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.proto import opamp_pb2 + + +class _NoOpCallbacks(Callbacks): + pass def test_can_instantiate_agent(): agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=mock.Mock() + interval=30, client=mock.Mock(), callbacks=_NoOpCallbacks() ) assert isinstance(agent, OpAMPAgent) def test_can_start_agent(): agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=mock.Mock() + interval=30, client=mock.Mock(), callbacks=_NoOpCallbacks() ) agent.start() agent.stop() @@ -37,12 +43,13 @@ def test_can_start_agent(): def test_agent_start_will_send_connection_and_disconnetion_messages(): client_mock = mock.Mock() - mock_message = {"mock": "message"} + mock_message = mock.Mock() + mock_message.HasField.return_value = False + mock_message.flags = 0 client_mock.send.return_value = mock_message - message_handler = mock.Mock() - agent = OpAMPAgent( - interval=30, client=client_mock, message_handler=message_handler - ) + + cb = mock.create_autospec(Callbacks, instance=True) + agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) agent.start() # wait for the queue to be consumed sleep(0.1) @@ -52,13 +59,16 @@ def test_agent_start_will_send_connection_and_disconnetion_messages(): assert client_mock.send.call_count == 2 # connection callback has been called assert agent._schedule is True - # connection message response has been received - message_handler.assert_called_once_with(agent, client_mock, mock_message) + # on_connect and on_message called for the connection response + cb.on_connect.assert_called_once_with(agent, client_mock) + cb.on_message.assert_called_once_with( + agent, client_mock, MessageData(remote_config=None) + ) def test_agent_can_call_agent_stop_multiple_times(): agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=mock.Mock() + interval=30, client=mock.Mock(), callbacks=_NoOpCallbacks() ) agent.start() agent.stop() @@ -67,14 +77,14 @@ def test_agent_can_call_agent_stop_multiple_times(): def test_agent_can_call_agent_stop_before_start(): agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=mock.Mock() + interval=30, client=mock.Mock(), callbacks=_NoOpCallbacks() ) agent.stop() def test_agent_send_warns_without_worker_thread(caplog): agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=mock.Mock() + interval=30, client=mock.Mock(), callbacks=_NoOpCallbacks() ) agent.send(payload="payload") @@ -89,19 +99,26 @@ def test_agent_send_warns_without_worker_thread(caplog): def test_agent_retries_before_max_attempts(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - message_handler_mock = mock.Mock() + + cb = mock.create_autospec(Callbacks, instance=True) client_mock = mock.Mock() - connection_message = disconnection_message = server_message = mock.Mock() + connection_message = mock.Mock() + connection_message.HasField.return_value = False + connection_message.flags = 0 + server_message = mock.Mock() + server_message.HasField.return_value = False + server_message.flags = 0 + disconnection_message = mock.Mock() client_mock.send.side_effect = [ connection_message, - Exception, + Exception("fail"), server_message, disconnection_message, ] agent = OpAMPAgent( interval=30, client=client_mock, - message_handler=message_handler_mock, + callbacks=cb, initial_backoff=0, ) agent.start() @@ -111,24 +128,32 @@ def test_agent_retries_before_max_attempts(caplog): agent.stop() assert client_mock.send.call_count == 4 - assert message_handler_mock.call_count == 2 + assert cb.on_message.call_count == 2 + assert cb.on_connect.call_count == 2 + assert cb.on_connect_failed.call_count == 1 def test_agent_stops_after_max_attempts(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - message_handler_mock = mock.Mock() + + cb = mock.create_autospec(Callbacks, instance=True) client_mock = mock.Mock() - connection_message = disconnection_message = mock.Mock() + connection_message = mock.Mock() + connection_message.HasField.return_value = False + connection_message.flags = 0 + disconnection_message = mock.Mock() + exc1 = Exception("fail1") + exc2 = Exception("fail2") client_mock.send.side_effect = [ connection_message, - Exception, - Exception, + exc1, + exc2, disconnection_message, ] agent = OpAMPAgent( interval=30, client=client_mock, - message_handler=message_handler_mock, + callbacks=cb, max_retries=1, initial_backoff=0, ) @@ -139,26 +164,199 @@ def test_agent_stops_after_max_attempts(caplog): agent.stop() assert client_mock.send.call_count == 4 - assert message_handler_mock.call_count == 1 + assert cb.on_message.call_count == 1 + assert cb.on_connect_failed.call_count == 2 + cb.on_connect_failed.assert_any_call(agent, client_mock, exc1) + cb.on_connect_failed.assert_any_call(agent, client_mock, exc2) def test_agent_send_enqueues_job(): - message_handler_mock = mock.Mock() - agent = OpAMPAgent( - interval=30, client=mock.Mock(), message_handler=message_handler_mock - ) + cb = mock.create_autospec(Callbacks, instance=True) + client_mock = mock.Mock() + msg = mock.Mock() + msg.HasField.return_value = False + msg.flags = 0 + client_mock.send.return_value = msg + + agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) agent.start() # wait for the queue to be consumed sleep(0.1) - # message handler called for connection message - assert message_handler_mock.call_count == 1 + # on_message called for connection message + assert cb.on_message.call_count == 1 agent.send(payload="payload") # wait for the queue to be consumed sleep(0.1) agent.stop() - # message handler called once for connection and once for our message - assert message_handler_mock.call_count == 2 + # on_message called once for connection and once for our message + assert cb.on_message.call_count == 2 + + +def test_on_error_called_without_on_message_for_error_response(): + cb = mock.create_autospec(Callbacks, instance=True) + client_mock = mock.Mock() + + error_response = opamp_pb2.ServerErrorResponse( + error_message="server error", + ) + server_msg = opamp_pb2.ServerToAgent( + error_response=error_response, + ) + # connection message (no error) + conn_msg = opamp_pb2.ServerToAgent() + + client_mock.send.side_effect = [ + conn_msg, # connection + server_msg, # message with error_response + mock.Mock(), # disconnect + ] + agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) + agent.start() + agent.send(payload="payload") + sleep(0.1) + agent.stop() + + # on_message called only for connection (not for error_response message) + assert cb.on_message.call_count == 1 + # on_error called for the message with error_response + cb.on_error.assert_called_once_with(agent, client_mock, error_response) + + +def test_on_error_not_called_without_error_response(): + cb = mock.create_autospec(Callbacks, instance=True) + client_mock = mock.Mock() + + server_msg = opamp_pb2.ServerToAgent() + client_mock.send.side_effect = [ + server_msg, # connection + server_msg, # message without error_response + mock.Mock(), # disconnect + ] + agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) + agent.start() + agent.send(payload="payload") + sleep(0.1) + agent.stop() + + assert cb.on_message.call_count == 2 + cb.on_error.assert_not_called() + + +def test_dispatch_order_with_error(): + """Verify that error_response skips on_message: on_connect -> on_error.""" + call_order = [] + client_mock = mock.Mock() + + error_response = opamp_pb2.ServerErrorResponse( + error_message="err", + ) + server_msg = opamp_pb2.ServerToAgent( + error_response=error_response, + ) + + class OrderTrackingCallbacks(Callbacks): + def on_connect(self, agent, client): + call_order.append("on_connect") + + def on_message(self, agent, client, message): + call_order.append("on_message") + + def on_error(self, agent, client, error_response): + call_order.append("on_error") + + client_mock.send.side_effect = [ + server_msg, # connection message with error + mock.Mock(), # disconnect + ] + agent = OpAMPAgent( + interval=30, client=client_mock, callbacks=OrderTrackingCallbacks() + ) + agent.start() + sleep(0.1) + agent.stop() + + assert call_order == ["on_connect", "on_error"] + + +def test_dispatch_order_without_error(): + """Verify normal dispatch order: on_connect -> on_message.""" + call_order = [] + client_mock = mock.Mock() + + server_msg = opamp_pb2.ServerToAgent() + + class OrderTrackingCallbacks(Callbacks): + def on_connect(self, agent, client): + call_order.append("on_connect") + + def on_message(self, agent, client, message): + call_order.append("on_message") + + def on_error(self, agent, client, error_response): + call_order.append("on_error") + + client_mock.send.side_effect = [ + server_msg, # connection message, no error + mock.Mock(), # disconnect + ] + agent = OpAMPAgent( + interval=30, client=client_mock, callbacks=OrderTrackingCallbacks() + ) + agent.start() + sleep(0.1) + agent.stop() + + assert call_order == ["on_connect", "on_message"] + + +def test_report_full_state_flag_triggers_full_state_send(): + cb = mock.create_autospec(Callbacks, instance=True) + client_mock = mock.Mock() + + conn_msg = opamp_pb2.ServerToAgent() + flag_msg = opamp_pb2.ServerToAgent( + flags=opamp_pb2.ServerToAgentFlags_ReportFullState, + ) + + no_flag_msg = opamp_pb2.ServerToAgent() + client_mock.send.side_effect = [ + conn_msg, # connection + flag_msg, # response with ReportFullState + no_flag_msg, # full state response + no_flag_msg, # disconnect + ] + client_mock.build_full_state_message.return_value = b"full-state" + + agent = OpAMPAgent(interval=30, client=client_mock, callbacks=cb) + agent.start() + agent.send(payload="payload") + sleep(0.2) + agent.stop() + + client_mock.build_full_state_message.assert_called() + + +def test_safe_invoke_logs_error(caplog): + caplog.set_level(logging.ERROR, logger="opentelemetry._opamp.agent") + + def bad_callback(): + raise ValueError("boom") + + _safe_invoke(bad_callback) + + assert any( + "Error when invoking function 'bad_callback'" in record.message + for record in caplog.records + ) + + +def test_safe_invoke_does_not_propagate(): + def bad_callback(): + raise RuntimeError("should not propagate") + + # Should not raise + _safe_invoke(bad_callback) def test_can_instantiate_job(): diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py new file mode 100644 index 0000000000..66ec8ef085 --- /dev/null +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_callbacks.py @@ -0,0 +1,36 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from opentelemetry._opamp.callbacks import Callbacks, MessageData +from opentelemetry._opamp.proto import opamp_pb2 + + +def test_subclass_override_subset(): + class MyCallbacks(Callbacks): + def __init__(self): + self.connected = False + + def on_connect(self, agent, client): + self.connected = True + + cb = MyCallbacks() + cb.on_connect(mock.Mock(), mock.Mock()) + assert cb.connected is True + + # non-overridden methods still work as no-ops + cb.on_connect_failed(mock.Mock(), mock.Mock(), Exception()) + cb.on_message(mock.Mock(), mock.Mock(), MessageData()) + cb.on_error(mock.Mock(), mock.Mock(), opamp_pb2.ServerErrorResponse()) diff --git a/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py b/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py index 68ceec414b..56998dd255 100644 --- a/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py +++ b/opamp/opentelemetry-opamp-client/tests/opamp/test_e2e.py @@ -20,6 +20,7 @@ import pytest from opentelemetry._opamp.agent import OpAMPAgent +from opentelemetry._opamp.callbacks import Callbacks from opentelemetry._opamp.client import OpAMPClient from opentelemetry._opamp.proto import opamp_pb2 @@ -32,26 +33,32 @@ def test_connection_remote_config_status_heartbeat_disconnection(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - def opamp_handler(agent, client, message): - logger = logging.getLogger("opentelemetry._opamp.agent.opamp_handler") + class E2ECallbacks(Callbacks): + def on_message(self, agent, client, message): + logger = logging.getLogger( + "opentelemetry._opamp.agent.opamp_handler" + ) - logger.debug("In opamp_handler") + logger.debug("In opamp_handler") - # we need to update the config only if we have a config - if not message.remote_config.config_hash: - return + # we need to update the config only if we have a config + if ( + message.remote_config is None + or not message.remote_config.config_hash + ): + return - updated_remote_config = client.update_remote_config_status( - remote_config_hash=message.remote_config.config_hash, - status=opamp_pb2.RemoteConfigStatuses_APPLIED, - error_message="", - ) - if updated_remote_config is not None: - logger.debug("Updated Remote Config") - message = client.build_remote_config_status_response_message( - updated_remote_config + updated_remote_config = client.update_remote_config_status( + remote_config_hash=message.remote_config.config_hash, + status=opamp_pb2.RemoteConfigStatuses_APPLIED, + error_message="", ) - agent.send(payload=message) + if updated_remote_config is not None: + logger.debug("Updated Remote Config") + msg = client.build_remote_config_status_response_message( + updated_remote_config + ) + agent.send(payload=msg) opamp_client = OpAMPClient( endpoint="https://localhost:4320/v1/opamp", @@ -63,7 +70,7 @@ def opamp_handler(agent, client, message): ) opamp_agent = OpAMPAgent( interval=1, - message_handler=opamp_handler, + callbacks=E2ECallbacks(), client=opamp_client, ) opamp_agent.start() @@ -78,12 +85,14 @@ def opamp_handler(agent, client, message): for record in caplog.record_tuples if record[0] == "opentelemetry._opamp.agent.opamp_handler" ] - # one call is for connection, one is remote config status, one is heartbeat + # connection response has ReportFullState flag, triggering a full state send. + # on_message is called for: connection, full state response, config status response, heartbeat. assert handler_records == [ "In opamp_handler", "Updated Remote Config", "In opamp_handler", "In opamp_handler", + "In opamp_handler", ] @@ -95,7 +104,7 @@ def opamp_handler(agent, client, message): def test_with_server_not_responding(caplog): caplog.set_level(logging.DEBUG, logger="opentelemetry._opamp.agent") - opamp_handler = mock.Mock() + cb = mock.create_autospec(Callbacks, instance=True) opamp_client = OpAMPClient( endpoint="https://localhost:4399/v1/opamp", @@ -107,11 +116,11 @@ def test_with_server_not_responding(caplog): ) opamp_agent = OpAMPAgent( interval=1, - message_handler=opamp_handler, + callbacks=cb, client=opamp_client, ) opamp_agent.start() opamp_agent.stop() - assert opamp_handler.call_count == 0 + assert cb.on_message.call_count == 0