diff --git a/agrirouter/auth/auth.py b/agrirouter/auth/auth.py index f1e2f634..b33527c0 100644 --- a/agrirouter/auth/auth.py +++ b/agrirouter/auth/auth.py @@ -34,4 +34,3 @@ def verify_auth_response(self, response, public_key=None): def _extract_query_params(query_params: str) -> dict: qp_pairs = parse_qs(query_params) return {k: v[0] for k, v in qp_pairs.items()} - diff --git a/agrirouter/auth/dto.py b/agrirouter/auth/dto.py index fec51c0a..55d1bbbb 100644 --- a/agrirouter/auth/dto.py +++ b/agrirouter/auth/dto.py @@ -4,44 +4,6 @@ from agrirouter.messaging.exceptions import WrongFieldError -class AuthorizationResultUrl: - def __init__(self, - *, - state: str = None, - signature: str = None, - token: str = None, - error: str = None - ): - self.state = state - self.signature = signature - self.token = token - self.error = error - - def get_state(self) -> str: - return self.state - - def set_state(self, state: str) -> None: - self.state = state - - def get_signature(self) -> str: - return self.signature - - def set_signature(self, signature: str) -> None: - self.signature = signature - - def get_token(self) -> str: - return self.token - - def set_token(self, token: str) -> None: - self.token = token - - def get_error(self) -> str: - return self.error - - def set_error(self, error: str) -> None: - self.error = error - - class AuthorizationToken: ACCOUNT = 'account' REGISTRATION_CODE = 'regcode' @@ -88,6 +50,52 @@ def set_expires(self, expires: str) -> None: self.expires = expires +class AuthorizationResultUrl: + def __init__(self, + *, + state: str = None, + signature: str = None, + token: str = None, + decoded_token: AuthorizationToken = None, + error: str = None + ): + self.state = state + self.signature = signature + self.token = token + self.decoded_token = decoded_token + self.error = error + + def get_state(self) -> str: + return self.state + + def set_state(self, state: str) -> None: + self.state = state + + def get_signature(self) -> str: + return self.signature + + def set_signature(self, signature: str) -> None: + self.signature = signature + + def get_token(self) -> str: + return self.token + + def set_token(self, token: str) -> None: + self.token = token + + def get_error(self) -> str: + return self.error + + def set_error(self, error: str) -> None: + self.error = error + + def get_decoded_token(self) -> AuthorizationToken: + return self.decoded_token + + def set_decoded_token(self, decoded_token: AuthorizationToken) -> None: + self.decoded_token = decoded_token + + class AuthorizationResult: def __init__(self, *, @@ -107,4 +115,4 @@ def get_state(self) -> str: return self.state def set_state(self, state: str) -> None: - self.state = state \ No newline at end of file + self.state = state diff --git a/agrirouter/auth/response.py b/agrirouter/auth/response.py index b5a73d48..a4b87f7f 100644 --- a/agrirouter/auth/response.py +++ b/agrirouter/auth/response.py @@ -5,7 +5,7 @@ from cryptography.exceptions import InvalidSignature -from agrirouter.auth.dto import AuthorizationToken +from agrirouter.auth.dto import AuthorizationToken, AuthorizationResultUrl from agrirouter.onboarding.signature import verify_signature @@ -69,16 +69,16 @@ def decode_token(token: Union[str, bytes]) -> AuthorizationToken: auth_token.json_deserialize(json.loads(decoded_token)) return auth_token - def get_auth_result(self) -> dict: - if not self.is_successful: - return {self.ERROR_KEY: self.error} + def get_auth_result(self) -> AuthorizationResultUrl: decoded_token = self.decode_token(self.token) - return { - self.SIGNATURE_KEY: self.signature, - self.STATE_KEY: self.state, - self.TOKEN_KEY: self.token, - self.CRED_KEY: decoded_token - } + + return AuthorizationResultUrl( + signature=self.signature, + state=self.state, + token=self.token, + decoded_token=decoded_token, + error=self.error + ) def get_signature(self): return self.signature diff --git a/agrirouter/environments/environments.py b/agrirouter/environments/environments.py index 5c688c7a..6b05b99e 100644 --- a/agrirouter/environments/environments.py +++ b/agrirouter/environments/environments.py @@ -40,10 +40,10 @@ def get_agrirouter_login_url(self) -> str: def get_secured_onboarding_authorization_url(self, application_id, response_type, state, redirect_uri=None) -> str: auth_url = self.get_base_url() + self._SECURED_ONBOARDING_AUTHORIZATION_LINK_TEMPLATE.format( - application_id=application_id, - response_type=response_type, - state=state - ) + application_id=application_id, + response_type=response_type, + state=state + ) return auth_url + f"&redirect_uri={redirect_uri}" if redirect_uri is not None else auth_url def get_mqtt_server_url(self, host, port) -> str: diff --git a/agrirouter/messaging/builders.py b/agrirouter/messaging/builders.py index 94e56ce8..d7815042 100644 --- a/agrirouter/messaging/builders.py +++ b/agrirouter/messaging/builders.py @@ -23,7 +23,7 @@ def with_task_data(self): self._subscription_items.append(subscription_item) return self - def with_device_description(self, ddis: List[int]=None, position: bool=None): + def with_device_description(self, ddis: List[int] = None, position: bool = None): subscription_item = Subscription.MessageTypeSubscriptionItem( technical_message_type=CapabilityType.ISO_11783_DEVICE_DESCRIPTION_PROTOBUF.value, ddis=ddis, diff --git a/agrirouter/messaging/certification.py b/agrirouter/messaging/certification.py index 50de6778..c87afc83 100644 --- a/agrirouter/messaging/certification.py +++ b/agrirouter/messaging/certification.py @@ -1,7 +1,4 @@ -import json import os -import pathlib -from pathlib import Path import tempfile diff --git a/agrirouter/messaging/clients/constants.py b/agrirouter/messaging/clients/constants.py new file mode 100644 index 00000000..78de8628 --- /dev/null +++ b/agrirouter/messaging/clients/constants.py @@ -0,0 +1,2 @@ +ASYNC = "ASYNC" +SYNC = "SYNC" diff --git a/agrirouter/messaging/clients/http.py b/agrirouter/messaging/clients/http.py index 429be1e7..cf1e400f 100644 --- a/agrirouter/messaging/clients/http.py +++ b/agrirouter/messaging/clients/http.py @@ -5,7 +5,6 @@ from urllib.parse import urlparse from agrirouter.messaging.certification import create_certificate_file_from_pen -from agrirouter.onboarding.dto import ConnectionCriteria from agrirouter.onboarding.response import SoftwareOnboardingResponse diff --git a/agrirouter/messaging/clients/mqtt.py b/agrirouter/messaging/clients/mqtt.py index 72ff399d..423897d0 100644 --- a/agrirouter/messaging/clients/mqtt.py +++ b/agrirouter/messaging/clients/mqtt.py @@ -1,16 +1,22 @@ +import time +import ssl from typing import Any, List, Tuple -from paho.mqtt import client as mqtt_client +import paho.mqtt.client as mqtt_client from paho.mqtt.client import MQTTv31, MQTTMessageInfo +from agrirouter.messaging.certification import create_certificate_file_from_pen +from agrirouter.messaging.clients.constants import SYNC, ASYNC + class MqttClient: def __init__(self, - client_id, + onboard_response, + client_id: str, on_message_callback: callable = None, userdata: Any = None, - clean_session: bool = True + clean_session: bool = False ): # TODO: Implement on_message_callback parameter validation: # must take params as described at https://pypi.org/project/paho-mqtt/#callbacks @@ -22,26 +28,54 @@ def __init__(self, protocol=MQTTv31, transport="tcp" ) + self.mqtt_client.on_message = on_message_callback if on_message_callback else self._get_on_message_callback() - self.mqtt_client.on_connect = self._get_on_connect_callback() + self.mqtt_client.on_connect = self._get_on_connect_callback(onboard_response) self.mqtt_client.on_disconnect = self._get_on_disconnect_callback() self.mqtt_client.on_subscribe = self._get_on_subscribe_callback() self.mqtt_client.on_unsubscribe = self._get_on_unsubscribe_callback() + certificate_file_path = create_certificate_file_from_pen(onboard_response) + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.load_cert_chain( + certfile=certificate_file_path, + keyfile=certificate_file_path, + password=onboard_response.get_authentication().get_secret(), + ) + self.mqtt_client.tls_set_context(context) + + self._mode = None + def connect(self, host: str, port: str) -> None: + self.mqtt_client.connect( + host=host, + port=int(port) + ) + self.mqtt_client.loop() + + self._mode = SYNC + + def connect_async(self, host: str, port: str): self.mqtt_client.connect_async( host=host, - port=port + port=int(port) ) self.mqtt_client.loop_start() + self._mode = ASYNC + + while self.mqtt_client._state == 0: + time.sleep(1) + def disconnect(self): self.mqtt_client.loop_stop() self.mqtt_client.disconnect() - def publish(self, topic, payload, qos=0) -> MQTTMessageInfo: - """ + def receive_outbox_messages(self): + self.mqtt_client.loop() + def publish(self, topic, payload, qos=2) -> MQTTMessageInfo: + """ :param topic: str representing unique name of the topic that the message should be published on :param payload: The actual message to send :param qos: int representing the quality of service level to use. May be [0, 1, 2] @@ -52,6 +86,10 @@ def publish(self, topic, payload, qos=0) -> MQTTMessageInfo: payload=payload, qos=qos ) + if self._mode == SYNC: + self.mqtt_client.loop() + time.sleep(3) + self.mqtt_client.loop() return message_info def subscribe(self, topics: List[Tuple[str, int]]) -> tuple: @@ -67,7 +105,7 @@ def subscribe(self, topics: List[Tuple[str, int]]) -> tuple: :return: tuple """ - result, mid = self.mqtt_client.subscribe(topics) + result, mid = self.mqtt_client.subscribe(topics, qos=2) return result, mid def unsubscribe(self, topics: List[str]) -> tuple: @@ -85,22 +123,12 @@ def unsubscribe(self, topics: List[str]) -> tuple: return result, mid @staticmethod - def _get_on_connect_callback() -> callable: - - def on_connect(client, userdata, flags, rc, properties=None): - print("Connection started") - with open("connection.txt", "w") as file: - file.write("Connection started") - if rc == 0: - file.write("Connected!!") - else: - file.write("Do not Connected!!") - if rc == 0: - print("Connected to MQTT Broker!") - else: - print(f"Failed to connect, return code: {rc}") + def _get_on_connect_callback(onboard_response) -> callable: - return client, userdata, flags, rc, properties + def on_connect(client: mqtt_client.Client, userdata, flags, rc, properties=None): + if rc == 0: + client.subscribe(topic=onboard_response.connection_criteria.commands) + time.sleep(3) return on_connect @@ -108,8 +136,6 @@ def on_connect(client, userdata, flags, rc, properties=None): def _get_on_message_callback() -> callable: def on_message(client, userdata, msg): - # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") - return client, userdata, msg return on_message @@ -117,29 +143,23 @@ def on_message(client, userdata, msg): @staticmethod def _get_on_subscribe_callback() -> callable: - def on_subscribe(client, userdata, mid, granted_qos, properties=None): - # print(f"Subscribed {userdata} to `{properties}`") - - return client, userdata, mid, granted_qos, properties + def on_subscribe(*args, **kwargs): + return args, kwargs return on_subscribe @staticmethod def _get_on_disconnect_callback() -> callable: - def on_disconnect(client, userdata, rc): - # print(f"Disconnected from from `{properties}`") - - return client, userdata, rc + def on_disconnect(*args, **kwargs): + return args, kwargs return on_disconnect @staticmethod def _get_on_unsubscribe_callback() -> callable: - def on_unsubscribe(client, userdata, mid): - # print(f"Unsubscribed `{userdata}` from `{properties}`") - - return client, userdata, mid + def on_unsubscribe(*args, **kwargs): + return args, kwargs return on_unsubscribe diff --git a/agrirouter/messaging/decode.py b/agrirouter/messaging/decode.py index 02387153..0d1a4e12 100644 --- a/agrirouter/messaging/decode.py +++ b/agrirouter/messaging/decode.py @@ -1,5 +1,4 @@ import base64 -from ctypes import Union from google.protobuf.any_pb2 import Any from google.protobuf.internal.decoder import _DecodeVarint diff --git a/agrirouter/messaging/messages.py b/agrirouter/messaging/messages.py index da475631..7db4e461 100644 --- a/agrirouter/messaging/messages.py +++ b/agrirouter/messaging/messages.py @@ -1,6 +1,5 @@ import json -from datetime import datetime, timezone -from typing import Union, List, Dict +from typing import Union, Dict from agrirouter.messaging.exceptions import WrongFieldError from agrirouter.utils.utc_time_util import now_as_utc_str diff --git a/agrirouter/messaging/parameters/service.py b/agrirouter/messaging/parameters/service.py index 631188ae..758df834 100644 --- a/agrirouter/messaging/parameters/service.py +++ b/agrirouter/messaging/parameters/service.py @@ -1,4 +1,3 @@ -from abc import ABC, abstractmethod from copy import deepcopy from typing import List diff --git a/agrirouter/messaging/request.py b/agrirouter/messaging/request.py index 9cc9507e..14766318 100644 --- a/agrirouter/messaging/request.py +++ b/agrirouter/messaging/request.py @@ -1,7 +1,5 @@ from typing import List -from agrirouter.messaging.messages import Message - class MessageRequest: SENSOR_ALTERNATE_ID = "sensorAlternateId" diff --git a/agrirouter/messaging/services/commons.py b/agrirouter/messaging/services/commons.py index 425aa976..8f4f6db5 100644 --- a/agrirouter/messaging/services/commons.py +++ b/agrirouter/messaging/services/commons.py @@ -1,16 +1,13 @@ import json -import os from abc import ABC, abstractmethod -import requests - -from agrirouter.messaging.certification import create_certificate_file_from_pen from agrirouter.messaging.clients.http import HttpClient from agrirouter.messaging.clients.mqtt import MqttClient from agrirouter.messaging.messages import Message from agrirouter.messaging.request import MessageRequest from agrirouter.messaging.result import MessagingResult from agrirouter.onboarding.exceptions import BadMessagingResult +from agrirouter.onboarding.response import SoftwareOnboardingResponse class AbstractMessagingClient(ABC): @@ -50,33 +47,35 @@ def send(self, parameters) -> MessagingResult: class MqttMessagingService(AbstractMessagingClient): def __init__(self, - client_id, - onboarding_response, + onboarding_response: SoftwareOnboardingResponse, on_message_callback: callable = None, + client_async: bool = True ): self.onboarding_response = onboarding_response self.client = MqttClient( - client_id=client_id, + onboard_response=onboarding_response, + client_id=onboarding_response.get_connection_criteria().get_client_id(), on_message_callback=on_message_callback, ) - self.client.connect( - self.onboarding_response.get_connection_criteria().get_host(), - self.onboarding_response.get_connection_criteria().get_port() - ) + if client_async: + self.client.connect_async( + self.onboarding_response.get_connection_criteria().get_host(), + self.onboarding_response.get_connection_criteria().get_port() + ) + else: + self.client.connect( + self.onboarding_response.get_connection_criteria().get_host(), + self.onboarding_response.get_connection_criteria().get_port() + ) def send(self, parameters, qos: int = 0) -> MessagingResult: message_request = self.create_message_request(parameters) mqtt_payload = message_request.json_serialize() self.client.publish( - self.onboarding_response.get_connection_criteria().get_measures(), json.dumps(mqtt_payload), + topic=self.onboarding_response.get_connection_criteria().get_measures(), + payload=json.dumps(mqtt_payload), qos=qos ) result = MessagingResult([parameters.get_application_message_id()]) return result - - def subscribe(self): - pass - - def unsubscribe(self): - pass diff --git a/agrirouter/messaging/services/http/outbox.py b/agrirouter/messaging/services/http/outbox.py index 13db25cf..7985f914 100644 --- a/agrirouter/messaging/services/http/outbox.py +++ b/agrirouter/messaging/services/http/outbox.py @@ -1,15 +1,7 @@ -import json -import os - -import requests - from agrirouter.messaging.clients.http import HttpClient from agrirouter.messaging.exceptions import OutboxException -from agrirouter.messaging.messages import OutboxMessage from agrirouter.messaging.result import OutboxResponse -from agrirouter.messaging.certification import create_certificate_file_from_pen - class OutboxService: @@ -27,4 +19,3 @@ def fetch(self, onboarding_response) -> OutboxResponse: raise OutboxException(f"Could not fetch messages from outbox. Status code was {response.status}") return outbox_response - diff --git a/agrirouter/messaging/services/messaging.py b/agrirouter/messaging/services/messaging.py index 685934ba..81f60d6f 100644 --- a/agrirouter/messaging/services/messaging.py +++ b/agrirouter/messaging/services/messaging.py @@ -6,7 +6,7 @@ from agrirouter.messaging.encode import encode_message from agrirouter.messaging.enums import TechnicalMessageType from agrirouter.messaging.messages import EncodedMessage -from agrirouter.messaging.parameters.dto import MessageParameters, MessagingParameters +from agrirouter.messaging.parameters.dto import MessagingParameters from agrirouter.messaging.parameters.service import MessageHeaderParameters, MessagePayloadParameters, \ CapabilityParameters, FeedConfirmParameters, FeedDeleteParameters, ListEndpointsParameters, \ SubscriptionParameters, QueryHeaderParameters, QueryMessageParameters diff --git a/agrirouter/onboarding/dto.py b/agrirouter/onboarding/dto.py index c4cbc314..d2b34bed 100644 --- a/agrirouter/onboarding/dto.py +++ b/agrirouter/onboarding/dto.py @@ -33,6 +33,7 @@ def json_serialize(self) -> dict: self.GATEWAY_ID: self.gateway_id, self.MEASURES: self.measures, self.COMMANDS: self.commands, + self.HOST: self.host, self.PORT: self.port, self.CLIENT_ID: self.client_id } @@ -91,6 +92,12 @@ def get_client_id(self) -> str: def set_client_id(self, client_id: str) -> None: self.client_id = client_id + def __str__(self): + return str(self.json_serialize()) + + def __repr__(self): + return str(self.json_serialize()) + class Authentication: TYPE = 'type' @@ -144,6 +151,12 @@ def get_certificate(self) -> str: def set_certificate(self, certificate: str) -> None: self.certificate = certificate + def __str__(self): + return str(self.json_serialize()) + + def __repr__(self): + return str(self.json_serialize()) + class ErrorResponse: def __init__(self, diff --git a/agrirouter/onboarding/headers.py b/agrirouter/onboarding/headers.py index fb598310..7843e2ad 100644 --- a/agrirouter/onboarding/headers.py +++ b/agrirouter/onboarding/headers.py @@ -1,5 +1,3 @@ -import base64 - from agrirouter.constants.media_types import ContentTypes diff --git a/agrirouter/onboarding/onboarding.py b/agrirouter/onboarding/onboarding.py index 74ded694..3119e181 100644 --- a/agrirouter/onboarding/onboarding.py +++ b/agrirouter/onboarding/onboarding.py @@ -1,5 +1,3 @@ -import json - import requests from agrirouter.environments.environmental_services import EnvironmentalService diff --git a/agrirouter/onboarding/parameters.py b/agrirouter/onboarding/parameters.py index bb62ae2e..1895c9fe 100644 --- a/agrirouter/onboarding/parameters.py +++ b/agrirouter/onboarding/parameters.py @@ -1,5 +1,3 @@ -from datetime import datetime - from agrirouter.constants.media_types import ContentTypes from agrirouter.onboarding.enums import CertificateTypes from agrirouter.utils.utc_time_util import now_as_utc_str diff --git a/agrirouter/onboarding/response.py b/agrirouter/onboarding/response.py index 1b9c6591..44a4f5b5 100644 --- a/agrirouter/onboarding/response.py +++ b/agrirouter/onboarding/response.py @@ -79,7 +79,7 @@ def __init__(self, http_response: Response = None): commands=response_body.get("connectionCriteria").get("commands"), host=response_body.get("connectionCriteria").get("host"), port=response_body.get("connectionCriteria").get("port"), - client_id=response_body.get("connectionCriteria").get("client_id") + client_id=response_body.get("connectionCriteria").get("clientId") ) if response_body.get("connectionCriteria", None) else None self.authentication = Authentication( @@ -134,8 +134,8 @@ def json_serialize(self): self.DEVICE_ALTERNATE_ID: self.device_alternate_id, self.CAPABILITY_ALTERNATE_ID: self.capability_alternate_id, self.SENSOR_ALTERNATE_ID: self.sensor_alternate_id, - self.CONNECTION_CRITERIA: self.connection_criteria, - self.AUTHENTICATION: self.authentication + self.CONNECTION_CRITERIA: self.connection_criteria.json_serialize(), + self.AUTHENTICATION: self.authentication.json_serialize() } def json_deserialize(self, data: Union[dict, str]): @@ -143,17 +143,23 @@ def json_deserialize(self, data: Union[dict, str]): for (key, value) in data_dict.items(): if key == self.DEVICE_ALTERNATE_ID: self.device_alternate_id = value - if key == self.CAPABILITY_ALTERNATE_ID: + elif key == self.CAPABILITY_ALTERNATE_ID: self.capability_alternate_id = value - if key == self.SENSOR_ALTERNATE_ID: + elif key == self.SENSOR_ALTERNATE_ID: self.sensor_alternate_id = value - if key == self.CONNECTION_CRITERIA: + elif key == self.CONNECTION_CRITERIA: connection_criteria = ConnectionCriteria() connection_criteria.json_deserialize(value) self.connection_criteria = connection_criteria - if key == self.AUTHENTICATION: + elif key == self.AUTHENTICATION: authentication = Authentication() authentication.json_deserialize(value) self.authentication = authentication else: raise WrongFieldError(f"Unknown field `{key}` for {self.__class__}") + + def __str__(self): + return str(self.json_serialize()) + + def __repr__(self): + return str(self.json_serialize()) diff --git a/agrirouter/revoking/headers.py b/agrirouter/revoking/headers.py index 0c1770f2..50652d5a 100644 --- a/agrirouter/revoking/headers.py +++ b/agrirouter/revoking/headers.py @@ -23,4 +23,4 @@ def _set_params(self, application_id: str, signature: str, content_type: str): if signature: header["X-Agrirouter-Signature"] = signature - self.params = header \ No newline at end of file + self.params = header diff --git a/example_script.py b/example_script.py index 0995c898..50e4ae3f 100644 --- a/example_script.py +++ b/example_script.py @@ -1,3 +1,12 @@ +from pprint import pprint + +from google.protobuf.timestamp_pb2 import Timestamp + +from agrirouter.generated.messaging.request.payload.account.endpoints_pb2 import ListEndpointsQuery +from agrirouter.generated.messaging.request.payload.feed.feed_requests_pb2 import ValidityPeriod +from agrirouter.onboarding.response import SoftwareOnboardingResponse +import time + public_key = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzGt41/+kSOTlO1sJvLIN 6RAFaOn6GiCNX/Ju0oVT4VMDHfuQMI5t9+ZgBxFmUhtY5/eykQfYJVGac/cy5xyk @@ -38,93 +47,264 @@ -----END PRIVATE KEY-----""" -application_id = "8c947a45-c57d-42d2-affc-206e21d63a50" # # store here your application id. You can find it in AR UI +onboarding_response_mqtt_data = { + "deviceAlternateId": "2145df0e-3451-46cb-bf23-23191af66fce", + "capabilityAlternateId": "523e4623-68d2-43d4-a0cc-e2ada2f68b5e", + "sensorAlternateId": "1489638c-7bed-4205-ad77-8d11efdc779f", + "connectionCriteria": { + "gatewayId": "2", + "host": "dke-qa.eu10.cp.iot.sap", + "port": 8883, + "clientId": "2145df0e-3451-46cb-bf23-23191af66fce", + "measures": "measures/2145df0e-3451-46cb-bf23-23191af66fce", + "commands": "commands/2145df0e-3451-46cb-bf23-23191af66fce" + }, + "authentication": { + "type": "PEM", + "secret": "JNKdNg8R0lwmFgvrUfOCc7inebr0h?!7Z9wL", + "certificate": "-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIE6zAdBgoqhkiG9w0BDAEDMA8ECMkL85F+LbPbAgMCAAAEggTI1CmRlnDUStBv\nTycvaRVFMCk1OuynhiOYRF6HBFFXBCxWKZa3WqTShLdf9iCel/NgtdZIiQsoD1LL\nMxVyh8pWAfLQ+pDJLvM6suQjHALt8dW5iTeCZ7R1gzFvPJ+xnDGFFytN7HmGSvHM\nQbcCOuEeIu8U6ENa6/+WmUwK9/ZMkLNqDHVKEGpI+lSJs8JWEE+S3Klmsxuq0dvz\nh6o3V7RKFwMfUZOQLHezGBDjLfEBdP+d2G87CY+LSzinL8pFhLwyrXFKfYWYoT0m\n5PkDdjfiVq3SJIUoQWnGrjaVVw4TV3WSxmhQnWbDwOQydr8DAiBxDMYoeK3rePpC\nwh6KATnBrovq1icqjonYDE0T+3Rs2SUbG+3+m9Zj4j46L2Sh9bUB6qxdw74Ck2/z\nAzJ1N+tB+RL7UvOpMOhmndMBl5qpx9dFFy8Z/N7w4YTQLZLN7chD8ApeFhCgvppt\nAGh8/VeWO54OC9ZOSHpxEl7sJz97jaHYNbw/lGbDk7cOZezwpA0NCWZ/Bb1vRDzy\n8EDX9s1hOA3jiy2T1RSyk2Rj/12pWdKtdSO8lMhMKC0B32Zr1F8rBJKDVzqFWuTt\nn+pXOKedyOA/ggyvYJdsltP8O4XB2oBN3WBdFK7Y1FG/tN30LsaqcnFTxab5v1Pp\ngq2dHu6Xy0TCMAw/DH3RmGXlGnDDWu86Zad7TjjrEZvpSIv4TTSCqqTvc4IN0xFX\nbKZCrY6JSkJWWnDMKrsRYOijUDvpAbYwZuTV9PAljYbt5YX778qxV9O0fNBQdaww\nNlfxU93jgr4g3E9nIzRxLu9S98hPbxKUnVYiQmYvP7vJUcUSo5F0LmUU/nvHY1pi\nr4tZDp8Xu1aZy7cOd3sTbf/68IjiZMZlF5/PVlOFOo40yGqW600j/qEqXoY/492h\nONXUCpHKaG/Pkjtg9THuYoaw1773gxYYsYLt+c6NkQCCsydOr2BMZQ4Qy4bZV67D\n2RNDeZzSBY6jEX6dnfY0FJqIsSiw28Ek5NXx0HTEGN8txPkx/1dfu3RfZnzUqT/0\nmS9xcWVYRmlip3vm48fMecqP/DNIHyjVLC39SsFdeXa+De76z/S3+or0t7HGlUim\nNVkIcWqm/sD2ia8hYberaRRTbUQ1iObNToIg8dA/xna6D61sYK8jkf1GVPpKsCTA\nOVW5u9XrE1f5YQEovE9kFgvtzs0u6jSeI9edqVadH1u6hX4QWQSTrcTb3raqAKpK\nl67cQ96eXI1WQPSdPhQPTjqzOPZDbot3qMkGFijHar7FdQjDx/cNhqhvxv0LWsvl\njgep1czUFoo1BS3wTUiO0qyloNGOQdgmlTOHbMFk1wgoNyAohfZtfn6LH/zlJnE3\nQ0YkUKgAG+1N/PmkQFO0k5qAflUV7h+HAzT1ZAZcscjHNbQFDc0Zjq9nE9sfhxE8\nOFpnF9Jp3fQVekyyC/dsCxtJdYfhxqYe+BzZu0SlsLCmc1JoK5lkiXQwv6+cFpKW\nwfHMTTrCoOetJyiF7oJX+t4adzmLmnujiw5izxObWQJ7avHC1oYNHfRejrOtlu34\n0nDPRFiSDyEbDCBXPe9dIafqjJVLQGFOeXC8/VN9cGSZp2JV8rqumWOr9E+Wd5zU\n8MRZpevo0i3rPgdyFRpw\n-----END ENCRYPTED PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----\nMIIEaDCCA1CgAwIBAgIPANHZYxYlOc+wEAEDDWDpMA0GCSqGSIb3DQEBCwUAMFYx\nCzAJBgNVBAYTAkRFMSMwIQYDVQQKExpTQVAgSW9UIFRydXN0IENvbW11bml0eSBJ\nSTEiMCAGA1UEAxMZU0FQIEludGVybmV0IG9mIFRoaW5ncyBDQTAeFw0yMTExMTIw\nNzMyMjNaFw0yMjExMTIwNzMyMjNaMIG1MQswCQYDVQQGEwJERTEcMBoGA1UEChMT\nU0FQIFRydXN0IENvbW11bml0eTEVMBMGA1UECxMMSW9UIFNlcnZpY2VzMXEwbwYD\nVQQDFGhkZXZpY2VBbHRlcm5hdGVJZDoyMTQ1ZGYwZS0zNDUxLTQ2Y2ItYmYyMy0y\nMzE5MWFmNjZmY2V8Z2F0ZXdheUlkOjJ8dGVuYW50SWQ6MTExNjkwMzQ5MHxpbnN0\nYW5jZUlkOmRrZS1xYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJeF\naxjV7Xk1R2dFjadN6WsUkrmcVu44vZRCJEbR7Chkg1xcXT6cgIlokO/V4lTgaD6i\neCMKMFegjXzEJQy0dyIWncozcmt6HJFxpdjVQtdCtDtCWykGscNDgvv5ukykOOKI\nMzWJ4d2cJRlostpNe4FYZoPp6cArSHTl9DvfYqjZ/ykeTa1w157dgVxPxezHrJMl\n+z2XgO37mq6CJLw8J6W8RBHbCADgB8c6qGHgJnBURyxnoHHi/yqdIKC6cOs8NAnc\nyVmnvLDu8RUWu9pWkqFHhMvSqdkUCTYORZ9mUTm/Kmv6ss2NaYT4uUBZTskwnAa9\nFLdj+DV2NG0OQl3NYr8CAwEAAaOB0jCBzzBIBgNVHR8EQTA/MD2gO6A5hjdodHRw\nczovL3Rjcy5teXNhcC5jb20vY3JsL1RydXN0Q29tbXVuaXR5SUkvU0FQSW9UQ0Eu\nY3JsMAwGA1UdEwEB/wQCMAAwJQYDVR0SBB4wHIYaaHR0cDovL3NlcnZpY2Uuc2Fw\nLmNvbS9UQ1MwDgYDVR0PAQH/BAQDAgbAMB0GA1UdDgQWBBSRf8DUjowgQ+6amVIs\njd7zM7VWqjAfBgNVHSMEGDAWgBSVt7P1WN7VtLNYRuDypsl4Tr0tdTANBgkqhkiG\n9w0BAQsFAAOCAQEARzSc9GLpSU3pRJPIfgadHrZ+2KQsPsQ1/fLlASlt4V1Rlxn7\n/tn0gk3sP0X5/TrkO+N0kx1qrLarxWSDiVfaXoPa6Lit30SBPnPLUPPPZeTJOz5r\nTW9PkPPuC39GlM1biVoil2cLZrTr9DMSUoBvR4IVKQoJveQsLwn7Ea+SDPE0uvZV\nbDN6UPGZ2yIiCXO1MODJ6r3A4EDD2MArGgfhGdbvJNAY36ShFJhzfzi0t8linEAA\nxh0vcaEEIkVeEiwiguyGWB69X88cjZ0Q5cCf0r6iu3oQnB57uM5TW12OwXQN1NpQ\neK3EMFSoM6BYJu/3B8TXhNmpNBvD7KYozw9XaA==\n-----END CERTIFICATE-----\n" + } +} + -######################################################## -# Authorization -print("Authorization...\n") import agrirouter as ar +from agrirouter.onboarding.enums import GateWays +from agrirouter.messaging.enums import CapabilityType +from agrirouter.generated.messaging.request.payload.endpoint.subscription_pb2 import Subscription +from agrirouter.messaging.services.commons import HttpMessagingService, MqttMessagingService +from agrirouter import ListEndpointsParameters, ListEndpointsService, SubscriptionService, SubscriptionParameters, \ + QueryHeaderService, QueryHeaderParameters +from agrirouter.utils.uuid_util import new_uuid -auth_params = ar.AuthUrlParameter(application_id=application_id, response_type="onboard") -auth_client = ar.Authorization("QA", public_key=public_key, private_key=private_key) -auth_url = auth_client.get_auth_request_url(auth_params) # use this url to authorize the user as described at https://docs.my-agrirouter.com/agrirouter-interface-documentation/latest/integration/authorization.html#perform-authorization -print(f"auth_url={auth_url}") -auth_result_url = input("Enter auth_url (the url the user was redirected to after his authorization, see above): ") # the url the user was redirected to after his authorization. -auth_response = auth_client.extract_auth_response(auth_result_url) # auth_response contains the results of the auth process -auth_client.verify_auth_response(auth_response) # you may verify auth_response to ensure answer was from AR +application_id = "8c947a45-c57d-42d2-affc-206e21d63a50" # # store here your application id. You can find it in AR UI -print(f"auth_response is successful: {auth_response.is_successful}") # True if user accepted application, False if he rejected -print(f"auth_response is valid: {auth_response.is_valid}") # Result of verification, if False, response was not validated by public key. Doesn't indicate the auth was successfull. Accessible only after response verifying +def example_auth(): + print("Authorization...\n") + auth_params = ar.AuthUrlParameter(application_id=application_id, response_type="onboard") + auth_client = ar.Authorization("QA", public_key=public_key, private_key=private_key) + auth_url = auth_client.get_auth_request_url( + auth_params) # use this url to authorize the user as described at https://docs.my-agrirouter.com/agrirouter-interface-documentation/latest/integration/authorization.html#perform-authorization + print(f"auth_url={auth_url}") -# Get dict containing data from auth process you will use for futher communication. -# If auth was rejected, contains {"error"} key. -# If auth was accepted, contains {signature, state, token, credentials{account, expires, regcode}} keys -# Even if response verifying was not processed or failed, the results will be returned. But in that case you act on your risk. -auth_data = auth_response.get_auth_result() -print(f"auth_data: {auth_data}") + auth_result_url = input( + "Enter auth_url (the url the user was redirected to after his authorization, see above): ") # the url the user was redirected to after his authorization. + auth_response = auth_client.extract_auth_response( + auth_result_url) # auth_response contains the results of the auth process + auth_client.verify_auth_response(auth_response) # you may verify auth_response to ensure answer was from AR -######################################################## + print( + f"auth_response is successful: {auth_response.is_successful}") # True if user accepted application, False if he rejected -# Onboarding -print("Onboarding...\n") + print( + f"auth_response is valid: {auth_response.is_valid}") # Result of verification, if False, response was not validated by public key. Doesn't indicate the auth was successfull. Accessible only after response verifying + # Get dict containing data from auth process you will use for futher communication. + # If auth was rejected, contains {"error"} key. + # If auth was accepted, contains {signature, state, token, credentials{account, expires, regcode}} keys + # Even if response verifying was not processed or failed, the results will be returned. But in that case you act on your risk. + auth_data = auth_response.get_auth_result() + print(f"auth_data: {auth_data}") -from agrirouter.onboarding.enums import GateWays + return auth_data -id_ = "urn:myapp:snr00003234" # just unique -certification_version_id = "edd5d6b7-45bb-4471-898e-ff9c2a7bf56f" # get from AR UI -time_zone = "+03:00" -onboarding_client = ar.SoftwareOnboarding("QA", public_key=public_key, private_key=private_key) -onboarding_parameters = ar.SoftwareOnboardingParameter(id_=id_, application_id=application_id, certification_version_id=certification_version_id, gateway_id=GateWays.REST.value, time_zone=time_zone, reg_code=auth_data["credentials"]["regcode"]) -onboarding_verifying_response = onboarding_client.verify(onboarding_parameters) -print(f"onboarding_verifying_response.status_code: {onboarding_verifying_response.status_code}") -print(f"onboarding_verifying_response.text: {onboarding_verifying_response.text}") -onboarding_response = onboarding_client.onboard(onboarding_parameters) -print(f"onboarding_response.status_code: {onboarding_response.status_code}") -print(f"onboarding_response.text: {onboarding_response.text}") +def example_onboarding(gateway_id): + auth_data = example_auth() -########################## -# Messaging + print("Onboarding...\n") + id_ = "urn:myapp:snr00003234" # just unique + certification_version_id = "edd5d6b7-45bb-4471-898e-ff9c2a7bf56f" # get from AR UI + time_zone = "+03:00" -from agrirouter.messaging.enums import CapabilityTypeDefinitions -from agrirouter.generated.messaging.request.payload.endpoint.subscription_pb2 import Subscription -from agrirouter.messaging.services.commons import HttpMessagingService, MqttMessagingService -from agrirouter import ListEndpointsParameters, ListEndpointsService, SubscriptionService, SubscriptionParameters -from agrirouter.utils.uuid_util import new_uuid + onboarding_client = ar.SoftwareOnboarding("QA", public_key=public_key, private_key=private_key) + onboarding_parameters = ar.SoftwareOnboardingParameter(id_=id_, application_id=application_id, + certification_version_id=certification_version_id, + gateway_id=gateway_id, time_zone=time_zone, + reg_code=auth_data.get_decoded_token().regcode) + onboarding_verifying_response = onboarding_client.verify(onboarding_parameters) + print(f"onboarding_verifying_response.status_code: {onboarding_verifying_response.status_code}") + print(f"onboarding_verifying_response.text: {onboarding_verifying_response.text}") + onboarding_response = onboarding_client.onboard(onboarding_parameters) + print(f"onboarding_response.status_code: {onboarding_response.status_code}") + print(f"onboarding_response.text: {onboarding_response.text}") + + return onboarding_response + + +def example_list_endpoints_mqtt(onboarding_response_data, foo): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = MqttMessagingService( + onboarding_response=onboarding_response, + on_message_callback=foo + + ) + list_endpoint_parameters = ListEndpointsParameters( + technical_message_type=CapabilityType.ISO_11783_TASKDATA_ZIP.value, + direction=ListEndpointsQuery.Direction.Value("SEND_RECEIVE"), + filtered=False, + onboarding_response=onboarding_response, + application_message_id=new_uuid(), + application_message_seq_no=1, + ) + list_endpoint_service = ListEndpointsService(messaging_service) + + messaging_result = list_endpoint_service.send(list_endpoint_parameters) + print("Sent message: ", messaging_result) -# List Endpoints + # Is needed for waiting of messaging responses from outbox + while True: + time.sleep(1) -messaging_service = HttpMessagingService() -list_endpoint_parameters = ListEndpointsParameters( - technical_message_type=CapabilityTypeDefinitions.ISO_11783_TASKDATA_ZIP.value, + +def example_list_endpoints_http(onboarding_response_data): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = HttpMessagingService() + list_endpoint_parameters = ListEndpointsParameters( + technical_message_type=CapabilityType.ISO_11783_TASKDATA_ZIP.value, direction=2, filtered=False, onboarding_response=onboarding_response, application_message_id=new_uuid(), application_message_seq_no=1, ) -list_endpoint_service = ListEndpointsService(messaging_service) -list_endpoint_service.send(list_endpoint_parameters) + list_endpoint_service = ListEndpointsService(messaging_service) + + messaging_result = list_endpoint_service.send(list_endpoint_parameters) + print("Sent message: ", messaging_result) -# Subscription + return messaging_result -messaging_service = HttpMessagingService() -subscription_service = SubscriptionService(messaging_service) -tmt = CapabilityTypeDefinitions.ISO_11783_TASKDATA_ZIP.value -subscription_item = Subscription.MessageTypeSubscriptionItem(technical_message_type=tmt) -subscription_parameters = SubscriptionParameters( +def example_subscription_http(onboarding_response_data): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = HttpMessagingService() + subscription_service = SubscriptionService(messaging_service) + tmt = CapabilityType.ISO_11783_TASKDATA_ZIP.value + subscription_item = Subscription.MessageTypeSubscriptionItem(technical_message_type=tmt) + subscription_parameters = SubscriptionParameters( subscription_items=[subscription_item], onboarding_response=onboarding_response, application_message_id=new_uuid(), application_message_seq_no=1, -) -subscription_service.send(subscription_parameters) + ) + + messaging_result = subscription_service.send(subscription_parameters) + print("Sent message: ", messaging_result) + + return messaging_result + + +def example_subscription_mqtt(onboarding_response_data, on_msg_callback): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = MqttMessagingService(onboarding_response, on_message_callback=on_msg_callback) + subscription_service = SubscriptionService(messaging_service) + tmt = CapabilityType.ISO_11783_TASKDATA_ZIP.value + subscription_item = Subscription.MessageTypeSubscriptionItem(technical_message_type=tmt) + subscription_parameters = SubscriptionParameters( + subscription_items=[subscription_item], + onboarding_response=onboarding_response, + application_message_id=new_uuid(), + application_message_seq_no=1, + ) + + messaging_result = subscription_service.send(subscription_parameters) + print("Sent message: ", messaging_result) + + # Is needed for waiting of messaging responses from outbox + while True: + time.sleep(1) + + +def example_query_header_message_http(onboarding_response_data): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = HttpMessagingService() + query_header_service = QueryHeaderService(messaging_service) + sent_from = Timestamp() + sent_to = Timestamp() + validity_period = ValidityPeriod(sent_from=sent_from, sent_to=sent_to) + query_header_parameters = QueryHeaderParameters( + message_ids=[new_uuid(), new_uuid()], + senders=[new_uuid(), new_uuid()], + validity_period=validity_period, + onboarding_response=onboarding_response, + application_message_id=new_uuid(), + application_message_seq_no=1, + ) + messaging_result = query_header_service.send(query_header_parameters) + print("Sent message: ", messaging_result) + + return messaging_result + + +def example_query_header_message_mqtt(onboarding_response_data, on_msg_callback): + onboarding_response = SoftwareOnboardingResponse() + onboarding_response.json_deserialize(onboarding_response_data) + + messaging_service = MqttMessagingService(onboarding_response, on_message_callback=on_msg_callback) + query_header_service = QueryHeaderService(messaging_service) + sent_from = Timestamp() + sent_to = Timestamp() + validity_period = ValidityPeriod(sent_from=sent_from, sent_to=sent_to) + query_header_parameters = QueryHeaderParameters( + message_ids=[new_uuid(), new_uuid()], + senders=[new_uuid(), new_uuid()], + validity_period=validity_period, + onboarding_response=onboarding_response, + application_message_id=new_uuid(), + application_message_seq_no=1, + ) + messaging_result = query_header_service.send(query_header_parameters) + print("Sent message: ", messaging_result) + + # Is needed for waiting of messaging responses from outbox + while True: + time.sleep(1) + + +def on_message_callback(client, userdata, msg): + + # Define here the way receiving messages will be processed + + from agrirouter.messaging.decode import decode_response + from agrirouter.messaging.decode import decode_details + from agrirouter.messaging.messages import OutboxMessage + + outbox_message = OutboxMessage() + outbox_message.json_deserialize(msg.payload.decode().replace("'", '"')) + + print(outbox_message.command.message) + + decoded_message = decode_response(outbox_message.command.message) + print(decoded_message.response_envelope) + + try: + decoded_details = decode_details(decoded_message.response_payload.details) + print(decoded_details) + except Exception as exc: + print("Error in decoding details: ", exc) + + +if __name__ == "__main__": + onboarding_response_mqtt = example_onboarding(GateWays.MQTT.value) + example_list_endpoints_mqtt(onboarding_response_mqtt.json_serialize(), on_message_callback) + + # or for http + # onboarding_response_mqtt = example_onboarding(GateWays.REST.value) + # example_list_endpoints_http(onboarding_response_mqtt.json_serialize()) diff --git a/examples.txt b/examples.txt index fcb13cd4..3ca0f065 100644 --- a/examples.txt +++ b/examples.txt @@ -89,7 +89,7 @@ True >>> time_zone = "+03:00" >>> onboarding_client = ar.SoftwareOnboarding("QA", public_key=public_key, private_key=private_key) ->>> onboarding_parameters = ar.SoftwareOnboardingParameter(id_=id_, application_id=application_id, certification_version_id=certification_version_id, gateway_id=GateWays.REST.value, time_zone=time_zone, reg_code=auth_data["credentials"]["regcode"]) +>>> onboarding_parameters = ar.SoftwareOnboardingParameter(id_=id_, application_id=application_id, certification_version_id=certification_version_id, gateway_id=GateWays.REST.value, time_zone=time_zone, reg_code=auth_data.get_decoded_token().regcode) >>> onboarding_verifying_response = onboarding_client.verify(onboarding_parameters) >>> onboarding_verifying_response.status_code >>> onboarding_verifying_response.text diff --git a/tests/messaging_test/test_decode.py b/tests/messaging_test/test_decode.py new file mode 100644 index 00000000..77d62143 --- /dev/null +++ b/tests/messaging_test/test_decode.py @@ -0,0 +1,21 @@ +import json + +import pytest + +from agrirouter.messaging.decode import decode_response +from agrirouter.messaging.decode import decode_details + + +MESSAGING_RESULT = b'[{"sensorAlternateId":"185cd97b-ed0b-4e75-a6e2-6be1cdd38a06","capabilityAlternateId":"bbe9f361-b551-48d9-9fca-1b4dc768287c","command":{"message":"XwjIARAKGiQ5NWUzNWE0Zi1jNWM4LTQ1NDEtODE4OS03NmJlMzM0OTc0NDUiJDUzNzYyM2ZjLWY2NmYtNDc5Yi1hMmJhLWVjZjNlNWM3ZjhlMCoMCNTV5YsGEICI8LIDzQIKygIKTnR5cGVzLmFncmlyb3V0ZXIuY29tL2Fncmlyb3V0ZXIucmVzcG9uc2UucGF5bG9hZC5hY2NvdW50Lkxpc3RFbmRwb2ludHNSZXNwb25zZRL3AQp4CiRkNzA0YTQ0My05OWY3LTQ3YjQtYmU1NS1lMmZhMDk2ODllYmUSJFB5dGhvblNES19kZXYgLSAyMDIxLTEwLTI1LCAxMDo1MToxOBoLYXBwbGljYXRpb24iBmFjdGl2ZTIVdXJuOm15YXBwOnNucjAwMDAzMjM0CnsKJDE4NWNkOTdiLWVkMGItNGU3NS1hNmUyLTZiZTFjZGQzOGEwNhIkUHl0aG9uU0RLX2RldiAtIDIwMjEtMTAtMjEsIDIxOjQxOjI0GgthcHBsaWNhdGlvbiIGYWN0aXZlMhh1cm46bXlhcHA6c25yMDAwMDMyMzRzZGY="}}]' + + +def test_decode_response(): + pass + + +def test_decode_details(): + json_response = json.loads(MESSAGING_RESULT) + message = decode_response(json_response[0]["command"]["message"].encode()) + decoded_details = decode_details(message.response_payload.details) + print(decoded_details) + assert False diff --git a/tests/sleeper.py b/tests/sleeper.py index 73eb1d28..9d4179ac 100644 --- a/tests/sleeper.py +++ b/tests/sleeper.py @@ -2,4 +2,4 @@ def let_agrirouter_process_the_message(seconds: int = 3): - time.sleep(3) + time.sleep(seconds)