From 386da1b3bbe65ec863197feb87366b48136c4fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 13:55:22 +0100 Subject: [PATCH 01/17] Add HiveMQ client dependency to Maven project Introduced the HiveMQ MQTT client (version 1.3.3) to the Maven project dependencies in pom.xml. This addition enables enhanced MQTT communication capabilities alongside the existing Paho client. --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index 7310dfc6..30f015a0 100644 --- a/pom.xml +++ b/pom.xml @@ -214,6 +214,11 @@ org.eclipse.paho.client.mqttv3 1.2.5 + + com.hivemq + hivemq-mqtt-client + 1.3.3 + From 2d7106924dc194f0b8ee2e51e2712512c13acbb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:05:55 +0100 Subject: [PATCH 02/17] Refactor MQTT client handling Refactor MQTT client handling by introducing MqttClientWrapper and PahoMqttClientWrapper interfaces. This change encapsulates MQTT client operations, providing better abstraction and sanitizes the exception handling within the PahoMqttClientWrapper. Additionally, updated pom.xml to include the necessary MQTT dependency. --- agrirouter-sdk-java-api/pom.xml | 4 ++ .../api/mqtt/MqttClientWrapper.java | 6 +++ .../api/mqtt/PahoMqttClientWrapper.java | 25 +++++++++++++ .../impl/messaging/MqttService.java | 12 +++--- .../mqtt/CloudOffboardingServiceImpl.java | 37 ++++++++----------- 5 files changed, 56 insertions(+), 28 deletions(-) create mode 100644 agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java create mode 100644 agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java diff --git a/agrirouter-sdk-java-api/pom.xml b/agrirouter-sdk-java-api/pom.xml index f365729a..1c362b7e 100644 --- a/agrirouter-sdk-java-api/pom.xml +++ b/agrirouter-sdk-java-api/pom.xml @@ -48,6 +48,10 @@ org.apache.commons commons-lang3 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java new file mode 100644 index 00000000..7ed95c48 --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java @@ -0,0 +1,6 @@ +package com.dke.data.agrirouter.api.mqtt; + +public interface MqttClientWrapper { + + void publish(String measures, byte[] payload); +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java new file mode 100644 index 00000000..d5dfd7c4 --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java @@ -0,0 +1,25 @@ +package com.dke.data.agrirouter.api.mqtt; + +import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoMqttClientWrapper implements MqttClientWrapper { + + private final IMqttClient mqttClient; + + public PahoMqttClientWrapper(IMqttClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void publish(String measures, byte[] payload) { + try { + this.mqttClient.publish(measures, new MqttMessage(payload)); + } catch (MqttException e) { + throw new CouldNotSendMqttMessageException(e); + } + } + +} diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java index 1a4bb7f7..d508b8f1 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java @@ -1,19 +1,19 @@ package com.dke.data.agrirouter.impl.messaging; -import org.eclipse.paho.client.mqttv3.IMqttClient; +import com.dke.data.agrirouter.api.mqtt.MqttClientWrapper; /** * Base class which holds the MQTT client with the connection provided by the provider. */ public class MqttService { - private final IMqttClient mqttClient; + private final MqttClientWrapper mqttClientWrapper; - public MqttService(IMqttClient mqttClient) { - this.mqttClient = mqttClient; + public MqttService(MqttClientWrapper mqttClientWrapper) { + this.mqttClientWrapper = mqttClientWrapper; } - protected IMqttClient getMqttClient() { - return mqttClient; + protected MqttClientWrapper getMqttClient() { + return mqttClientWrapper; } } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java index a3c8ca19..2eded5f4 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOffboardingService; import com.dke.data.agrirouter.api.service.parameters.CloudOffboardingParameters; @@ -11,8 +11,6 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -28,7 +26,7 @@ public class CloudOffboardingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public CloudOffboardingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); } /** @@ -40,24 +38,19 @@ public CloudOffboardingServiceImpl(IMqttClient mqttClient) { @Override public String send(CloudOffboardingParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), payload); + return encodedMessage.getApplicationMessageID(); } @Override From ac46f1959b32f99b929899a8062af307e7ca4e4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:06:34 +0100 Subject: [PATCH 03/17] Refactor MQTT client handling Introduced MqttClientWrapper and PahoMqttClientWrapper to encapsulate MQTT client operations, improving abstraction and sanitarizing exception handling. This also simplifies the CloudOnboardingServiceImpl class and removes direct dependencies on the Paho MQTT library. --- .../mqtt/CloudOnboardingServiceImpl.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java index 96d2b897..1fce02ad 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOnboardingService; import com.dke.data.agrirouter.api.service.parameters.CloudOnboardingParameters; @@ -11,8 +11,6 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,7 +23,7 @@ public class CloudOnboardingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public CloudOnboardingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); } /** @@ -37,24 +35,20 @@ public CloudOnboardingServiceImpl(IMqttClient mqttClient) { @Override public String send(CloudOnboardingParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From 1ca89e97a67bb7e74e801645e755e7d3a1a9b8f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:07:08 +0100 Subject: [PATCH 04/17] Update DeleteMessageServiceImpl to use PahoMqttClientWrapper Replaced direct usage of IMqttClient with PahoMqttClientWrapper for message publishing in DeleteMessageServiceImpl. Also removed unnecessary exception handling and simplified the payload publishing process. --- .../mqtt/DeleteMessageServiceImpl.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java index eee3006c..be4603d5 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java @@ -1,8 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.DeleteMessageService; import com.dke.data.agrirouter.api.service.parameters.DeleteMessageParameters; @@ -13,8 +13,6 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.helper.DeleteAllMessagesParameterCreator; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -30,30 +28,26 @@ public class DeleteMessageServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public DeleteMessageServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); } @Override public String send(DeleteMessageParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From d6cd715aa80b3784ead390b137572ad7deb4639f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:07:56 +0100 Subject: [PATCH 05/17] Refactor MQTT client handling in ListEndpointsServiceImpl. Replace direct usage of IMqttClient with PahoMqttClientWrapper to improve MQTT client management. Remove the exception handling for MqttException, as it is now managed within the wrapper. --- .../mqtt/ListEndpointsServiceImpl.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java index 7172236b..3dd615a3 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java @@ -3,8 +3,8 @@ import agrirouter.request.payload.account.Endpoints; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.messaging.mqtt.ListEndpointsService; @@ -17,8 +17,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -34,31 +32,27 @@ public class ListEndpointsServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public ListEndpointsServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(ListEndpointsParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From b3292df5c02599ceb51300147a5f2c126cbf2fd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:08:28 +0100 Subject: [PATCH 06/17] Refactor MQTT message confirmation handling Replaced direct use of `IMqttClient` with `PahoMqttClientWrapper` to streamline the client interface and removed unnecessary exception handling. This refactoring simplifies the codebase and improves maintainability by reducing direct dependencies on the Paho client within `MessageConfirmationServiceImpl.java`. --- .../mqtt/MessageConfirmationServiceImpl.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java index 0dfda664..6deaca0c 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageConfirmationService; import com.dke.data.agrirouter.api.service.parameters.MessageConfirmationParameters; @@ -11,8 +11,6 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,31 +23,27 @@ public class MessageConfirmationServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public MessageConfirmationServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(MessageConfirmationParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From a1773651be9894b52b8d2bedbe3a6b871d60dab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:10:23 +0100 Subject: [PATCH 07/17] Refactor MQTT client handling for improved reliability Replace direct usage of `IMqttClient` with `PahoMqttClientWrapper` across multiple service implementations to streamline error handling and reduce boilerplate code. This includes updating message sending methods to remove redundant try-catch blocks and simplifying message publication. --- .../mqtt/MessageHeaderQueryServiceImpl.java | 3 +- .../mqtt/MessageQueryServiceImpl.java | 3 +- .../mqtt/SendMessageServiceImpl.java | 26 +++++-------- .../mqtt/SetCapabilityServiceImpl.java | 38 ++++++++----------- .../mqtt/SetSubscriptionServiceImpl.java | 38 ++++++++----------- 5 files changed, 46 insertions(+), 62 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java index 94d20b3e..f6f37451 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java @@ -4,6 +4,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageHeaderQueryService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; import com.dke.data.agrirouter.impl.messaging.MqttService; @@ -23,7 +24,7 @@ public class MessageHeaderQueryServiceImpl extends MqttService private final MessageQueryHelperService messageQueryHelperService; public MessageHeaderQueryServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); messageQueryHelperService = new MessageQueryHelperService( mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_HEADER_QUERY); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java index dcea98b8..33f7f4e2 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java @@ -4,6 +4,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageQueryService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; @@ -26,7 +27,7 @@ public class MessageQueryServiceImpl extends MqttService private final MessageQueryHelperService messageQueryHelperService; public MessageQueryServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.messageQueryHelperService = new MessageQueryHelperService( mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_MESSAGE_QUERY); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java index 9999085a..86b1ef9b 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java @@ -1,13 +1,11 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.SendMessageService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; import com.dke.data.agrirouter.impl.messaging.MessageBodyCreator; import com.dke.data.agrirouter.impl.messaging.MqttService; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -21,7 +19,7 @@ public class SendMessageServiceImpl extends MqttService implements SendMessageService, MessageBodyCreator { public SendMessageServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); } /** @@ -31,18 +29,14 @@ public SendMessageServiceImpl(IMqttClient mqttClient) { */ public void send(SendMessageParameters sendMessageParameters) { sendMessageParameters.validate(); - try { - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(sendMessageParameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(sendMessageParameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java index ec381a5e..e6c98f5d 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetCapabilityService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -12,8 +12,6 @@ import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.dke.data.agrirouter.impl.validation.ResponseValidator; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,31 +24,27 @@ public class SetCapabilityServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public SetCapabilityServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(SetCapabilitiesParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java index 6e882790..64bb3d93 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetSubscriptionService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -12,8 +12,6 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.validation.ResponseValidator; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,31 +23,27 @@ public class SetSubscriptionServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public SetSubscriptionServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(SetSubscriptionParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From 55f8898a0ebbd97a04f66f936c1fe7590100685c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:21:24 +0100 Subject: [PATCH 08/17] Refactor MQTT client integration Renamed MQTT client classes to a new package and added HiveMQ MQTT client wrapper for improved modularity. Eliminated direct Paho dependencies in MessageQueryHelperService in favor of a unified client wrapper interface. Updated `.gitignore` to include local settings configuration. --- .gitignore | 5 ++- agrirouter-sdk-java-api/pom.xml | 4 ++ .../api/mqtt/HiveMqttClientWrapper.java | 17 +++++++ .../{client => paho}/MqttClientService.java | 2 +- .../{client => paho}/MqttOptionService.java | 2 +- .../mqtt/MessageQueryHelperService.java | 44 ++++++++----------- 6 files changed, 46 insertions(+), 28 deletions(-) create mode 100644 agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java rename agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/{client => paho}/MqttClientService.java (97%) rename agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/{client => paho}/MqttOptionService.java (98%) diff --git a/.gitignore b/.gitignore index 659ca628..93556dfe 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,7 @@ buildNumber.properties .settings # GENERATED # -src/main/generated \ No newline at end of file +src/main/generated + +# LOCAL SETTINGS # +ci/local-settings.xml \ No newline at end of file diff --git a/agrirouter-sdk-java-api/pom.xml b/agrirouter-sdk-java-api/pom.xml index 1c362b7e..3ba9cef1 100644 --- a/agrirouter-sdk-java-api/pom.xml +++ b/agrirouter-sdk-java-api/pom.xml @@ -52,6 +52,10 @@ org.eclipse.paho org.eclipse.paho.client.mqttv3 + + com.hivemq + hivemq-mqtt-client + diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java new file mode 100644 index 00000000..eccf7212 --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java @@ -0,0 +1,17 @@ +package com.dke.data.agrirouter.api.mqtt; + +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; + +public class HiveMqttClientWrapper implements MqttClientWrapper { + + private final Mqtt3AsyncClient mqttClient; + + public HiveMqttClientWrapper(Mqtt3AsyncClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void publish(String measures, byte[] payload) { + this.mqttClient.publishWith().topic(measures).payload(payload).send(); + } +} diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java similarity index 97% rename from agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java rename to agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java index 7991f6fa..f21aa841 100644 --- a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java @@ -1,4 +1,4 @@ -package com.dke.data.agrirouter.convenience.mqtt.client; +package com.dke.data.agrirouter.convenience.mqtt.paho; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java similarity index 98% rename from agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java rename to agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java index fa7a13b8..b0e74eec 100644 --- a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java @@ -1,4 +1,4 @@ -package com.dke.data.agrirouter.convenience.mqtt.client; +package com.dke.data.agrirouter.convenience.mqtt.paho; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java index fef5faaf..801b44bf 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java @@ -1,7 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.helper.mqtt; import com.dke.data.agrirouter.api.enums.TechnicalMessageType; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -10,8 +10,6 @@ import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,7 +24,7 @@ public MessageQueryHelperService( IMqttClient mqttClient, EncodeMessageService encodeMessageService, TechnicalMessageType technicalMessageType) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); this.logMethodBegin(); this.encodeMessageService = encodeMessageService; this.technicalMessageType = technicalMessageType; @@ -38,29 +36,25 @@ public String send(MessageQueryParameters parameters) { this.getNativeLogger().trace("Validate parameters."); parameters.validate(); - try { - this.getNativeLogger().trace("Encode message."); - var encodedMessage = this.encode(this.technicalMessageType, parameters); + this.getNativeLogger().trace("Encode message."); + var encodedMessage = this.encode(this.technicalMessageType, parameters); - this.getNativeLogger().trace("Build message parameters."); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); + this.getNativeLogger().trace("Build message parameters."); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); - this.getNativeLogger().trace("Send and fetch message response."); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + this.getNativeLogger().trace("Send and fetch message response."); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From a066793d6a00d8823b9d1cdcc4907692b054ee7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 14:28:34 +0100 Subject: [PATCH 09/17] Support HiveMQ MQTT Client in messaging services Added constructors to various messaging service implementations to support the HiveMQ MQTT client. This increases compatibility and allows for more flexible client configurations. Existing functionality with the Paho MQTT client remains unchanged. --- .../helper/mqtt/MessageQueryHelperService.java | 13 +++++++++++-- .../messaging/mqtt/CloudOffboardingServiceImpl.java | 10 ++++++++-- .../messaging/mqtt/CloudOnboardingServiceImpl.java | 6 ++++++ .../messaging/mqtt/DeleteMessageServiceImpl.java | 6 ++++++ .../messaging/mqtt/ListEndpointsServiceImpl.java | 7 +++++++ .../mqtt/MessageConfirmationServiceImpl.java | 7 +++++++ .../mqtt/MessageHeaderQueryServiceImpl.java | 9 +++++++++ .../messaging/mqtt/MessageQueryServiceImpl.java | 9 +++++++++ .../impl/messaging/mqtt/SendMessageServiceImpl.java | 6 ++++++ .../messaging/mqtt/SetCapabilityServiceImpl.java | 7 +++++++ .../messaging/mqtt/SetSubscriptionServiceImpl.java | 7 +++++++ 11 files changed, 83 insertions(+), 4 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java index 801b44bf..775591f9 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.helper.mqtt; import com.dke.data.agrirouter.api.enums.TechnicalMessageType; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; @@ -9,6 +10,7 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -25,10 +27,17 @@ public MessageQueryHelperService( EncodeMessageService encodeMessageService, TechnicalMessageType technicalMessageType) { super(new PahoMqttClientWrapper(mqttClient)); - this.logMethodBegin(); this.encodeMessageService = encodeMessageService; this.technicalMessageType = technicalMessageType; - this.logMethodEnd(); + } + + public MessageQueryHelperService( + Mqtt3AsyncClient mqttClient, + EncodeMessageService encodeMessageService, + TechnicalMessageType technicalMessageType) { + super(new HiveMqttClientWrapper(mqttClient)); + this.encodeMessageService = encodeMessageService; + this.technicalMessageType = technicalMessageType; } public String send(MessageQueryParameters parameters) { diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java index 2eded5f4..caedaaa7 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOffboardingService; @@ -10,6 +11,7 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -29,13 +31,17 @@ public CloudOffboardingServiceImpl(IMqttClient mqttClient) { super(new PahoMqttClientWrapper(mqttClient)); } - /** + public CloudOffboardingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + } + + /** * Offboarding a virtual CU. Will deliver no result if the action was successful, if there's any * error an exception will be thrown. * * @param parameters Parameters for offboarding. */ - @Override + @Override public String send(CloudOffboardingParameters parameters) { parameters.validate(); var encodedMessage = this.encode(parameters); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java index 1fce02ad..2c6c9367 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOnboardingService; @@ -10,6 +11,7 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -26,6 +28,10 @@ public CloudOnboardingServiceImpl(IMqttClient mqttClient) { super(new PahoMqttClientWrapper(mqttClient)); } + public CloudOnboardingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + } + /** * Onboarding a virtual CU for an existing cloud application (incl. several checks). * diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java index be4603d5..43e23f60 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java @@ -2,6 +2,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.DeleteMessageService; @@ -12,6 +13,7 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.helper.DeleteAllMessagesParameterCreator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -31,6 +33,10 @@ public DeleteMessageServiceImpl(IMqttClient mqttClient) { super(new PahoMqttClientWrapper(mqttClient)); } + public DeleteMessageServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + } + @Override public String send(DeleteMessageParameters parameters) { parameters.validate(); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java index 3dd615a3..e2991f31 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java @@ -4,6 +4,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; @@ -16,6 +17,7 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -36,6 +38,11 @@ public ListEndpointsServiceImpl(IMqttClient mqttClient) { this.encodeMessageService = new EncodeMessageServiceImpl(); } + public ListEndpointsServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + @Override public String send(ListEndpointsParameters parameters) { parameters.validate(); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java index 6deaca0c..941813f4 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageConfirmationService; @@ -10,6 +11,7 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -27,6 +29,11 @@ public MessageConfirmationServiceImpl(IMqttClient mqttClient) { this.encodeMessageService = new EncodeMessageServiceImpl(); } + public MessageConfirmationServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + @Override public String send(MessageConfirmationParameters parameters) { parameters.validate(); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java index f6f37451..1546ba34 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java @@ -4,6 +4,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageHeaderQueryService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; @@ -13,6 +14,7 @@ import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.concurrent.CompletableFuture; @@ -30,6 +32,13 @@ public MessageHeaderQueryServiceImpl(IMqttClient mqttClient) { mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_HEADER_QUERY); } + public MessageHeaderQueryServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + messageQueryHelperService = + new MessageQueryHelperService( + mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_HEADER_QUERY); + } + @Override public String send(MessageQueryParameters parameters) { return this.messageQueryHelperService.send(parameters); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java index 33f7f4e2..4d2667a1 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java @@ -4,6 +4,7 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageQueryService; @@ -14,6 +15,7 @@ import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.concurrent.CompletableFuture; @@ -33,6 +35,13 @@ public MessageQueryServiceImpl(IMqttClient mqttClient) { mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_MESSAGE_QUERY); } + public MessageQueryServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + this.messageQueryHelperService = + new MessageQueryHelperService( + mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_MESSAGE_QUERY); + } + @Override public String send(MessageQueryParameters parameters) { return this.messageQueryHelperService.send(parameters); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java index 86b1ef9b..b2344296 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java @@ -1,10 +1,12 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.SendMessageService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; import com.dke.data.agrirouter.impl.messaging.MessageBodyCreator; import com.dke.data.agrirouter.impl.messaging.MqttService; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Objects; @@ -22,6 +24,10 @@ public SendMessageServiceImpl(IMqttClient mqttClient) { super(new PahoMqttClientWrapper(mqttClient)); } + public SendMessageServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + } + /** * Send message synchronous. * diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java index e6c98f5d..647ab62f 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetCapabilityService; @@ -11,6 +12,7 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.dke.data.agrirouter.impl.validation.ResponseValidator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -28,6 +30,11 @@ public SetCapabilityServiceImpl(IMqttClient mqttClient) { this.encodeMessageService = new EncodeMessageServiceImpl(); } + public SetCapabilityServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + @Override public String send(SetCapabilitiesParameters parameters) { parameters.validate(); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java index 64bb3d93..6ae91be7 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetSubscriptionService; @@ -11,6 +12,7 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.validation.ResponseValidator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.Collections; @@ -27,6 +29,11 @@ public SetSubscriptionServiceImpl(IMqttClient mqttClient) { this.encodeMessageService = new EncodeMessageServiceImpl(); } + public SetSubscriptionServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + @Override public String send(SetSubscriptionParameters parameters) { parameters.validate(); From dfa75cc306df6df9b4f595c0644a9745fa8ca736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:15:59 +0100 Subject: [PATCH 10/17] Rename paho package to client Renamed MqttOptionService and MqttClientService classes to reflect a more appropriate package name of `client` instead of `paho`. This enhances clarity and consistency throughout the project. --- .../convenience/mqtt/{paho => client}/MqttClientService.java | 2 +- .../convenience/mqtt/{paho => client}/MqttOptionService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/{paho => client}/MqttClientService.java (97%) rename agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/{paho => client}/MqttOptionService.java (98%) diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java similarity index 97% rename from agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java rename to agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java index f21aa841..7991f6fa 100644 --- a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttClientService.java +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttClientService.java @@ -1,4 +1,4 @@ -package com.dke.data.agrirouter.convenience.mqtt.paho; +package com.dke.data.agrirouter.convenience.mqtt.client; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java similarity index 98% rename from agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java rename to agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java index b0e74eec..fa7a13b8 100644 --- a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/paho/MqttOptionService.java +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/client/MqttOptionService.java @@ -1,4 +1,4 @@ -package com.dke.data.agrirouter.convenience.mqtt.paho; +package com.dke.data.agrirouter.convenience.mqtt.client; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; From f2751044887eb3a5600073919f4e1b9c1fffa7a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:19:18 +0100 Subject: [PATCH 11/17] Add HiveMqttClientService for creating MQTT clients Introduced HiveMqttClientService to facilitate the creation of MQTT clients using onboarding responses or router devices. This service ensures robust client creation, utilizing provided host, port, and clientId parameters and enforcing necessary validations. --- .../mqtt/hive/HiveMqttClientService.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java new file mode 100644 index 00000000..a3c2378d --- /dev/null +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java @@ -0,0 +1,71 @@ +package com.dke.data.agrirouter.convenience.mqtt.hive; + +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; +import com.dke.data.agrirouter.api.env.Environment; +import com.dke.data.agrirouter.api.exception.CouldNotCreateMqttClientException; +import com.dke.data.agrirouter.impl.EnvironmentalService; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import org.apache.commons.lang3.StringUtils; + +import java.util.Objects; + +/** + * Service to create a MQTT client using the given onboarding response. + */ +@SuppressWarnings("unused") +public class HiveMqttClientService extends EnvironmentalService { + + /** + * Constructor for an environmental service. + * + * @param environment - + */ + public HiveMqttClientService(Environment environment) { + super(environment); + } + + /** + * Creates a MQTT client using the given onboarding response. Communication relies on given root + * certificates in an external keystore. The keystore with the root certificates is not created + * locally. + * + * @param onboardingResponse - + * @return - + */ + public Mqtt3AsyncClient create(OnboardingResponse onboardingResponse) { + return this.createMqttClient( + onboardingResponse.getConnectionCriteria().getHost(), + onboardingResponse.getConnectionCriteria().getPort(), + onboardingResponse.getConnectionCriteria().getClientId()); + } + + /** + * Creates a MQTT client using the given router Device. Communication relies on given root + * certificates in an external keystore. The keystore with the root certificates is not created + * locally. + * + * @param routerDevice - + * @return - + */ + public Mqtt3AsyncClient create(RouterDevice routerDevice) { + return this.createMqttClient( + routerDevice.getConnectionCriteria().getHost(), + String.valueOf(routerDevice.getConnectionCriteria().getPort()), + routerDevice.getConnectionCriteria().getClientId()); + } + + private Mqtt3AsyncClient createMqttClient(String host, String port, String clientId) { + if (StringUtils.isAnyBlank(host, port, clientId)) { + throw new CouldNotCreateMqttClientException( + "Currently there are parameters missing. Did you onboard correctly - host, port or client id are missing."); + } + return MqttClient.builder() + .useMqttVersion3() + .identifier(Objects.requireNonNull(clientId)) + .serverHost(host) + .serverPort(Integer.parseInt(port)) + .buildAsync(); + } +} From 929856194a360f8755b3be0232ac7f6e1b31fab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:20:00 +0100 Subject: [PATCH 12/17] Add Javadoc comments to MqttClientWrapper interface Added detailed Javadoc comments to the MqttClientWrapper interface. These comments describe the interface and its publish method, providing clarity on how to use them. --- .../data/agrirouter/api/mqtt/MqttClientWrapper.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java index 7ed95c48..3948911b 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java @@ -1,6 +1,17 @@ package com.dke.data.agrirouter.api.mqtt; +/** + * Interface representing an MQTT client wrapper for publishing messages. + * Implementations of this interface should define how to publish messages + * to a specific topic with a given payload. + */ public interface MqttClientWrapper { + /** + * Publishes a message to a specific topic with a given payload. + * + * @param measures the topic to which the message should be published + * @param payload the byte array representing the content of the message + */ void publish(String measures, byte[] payload); } From 80bc9e0c298655cf2fc0aeaad5b26427be9d3df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:26:01 +0100 Subject: [PATCH 13/17] Add class-level Javadoc comments for MQTT client wrappers Added detailed Javadoc comments to both PahoMqttClientWrapper and HiveMqttClientWrapper classes. These comments provide an overview of the class functionality and its role in publishing MQTT messages to specified topics, which enhances code readability and maintainability. --- .../dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java | 5 +++++ .../dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java index eccf7212..c499dee1 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java @@ -2,6 +2,11 @@ import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +/** + * A wrapper around HiveMQ's Mqtt3AsyncClient that implements the MqttClientWrapper interface. + * This class is responsible for publishing messages to a specified MQTT topic using the + * provided Mqtt3AsyncClient. + */ public class HiveMqttClientWrapper implements MqttClientWrapper { private final Mqtt3AsyncClient mqttClient; diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java index d5dfd7c4..0fdc606f 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java @@ -5,6 +5,11 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +/** + * PahoMqttClientWrapper is an implementation of the MqttClientWrapper interface, + * providing a wrapper around an instance of IMqttClient from the Paho MQTT library. + * This class handles the publishing of MQTT messages to specified topics with a given payload. + */ public class PahoMqttClientWrapper implements MqttClientWrapper { private final IMqttClient mqttClient; From bce4e9b7464586c5547292227bc067c8fda3e458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:31:45 +0100 Subject: [PATCH 14/17] Refactor PingServiceImpl to improve client handling Refactor the PingServiceImpl to use wrapper classes for different MQTT clients, ensuring better abstraction and flexibility. This change replaces the exception handling mechanism and removes unnecessary imports, thus streamlining the message publishing process. --- .../impl/messaging/mqtt/PingServiceImpl.java | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java index b20daf4b..0307c753 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.PingService; import com.dke.data.agrirouter.api.service.parameters.PingParameters; @@ -10,9 +11,8 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,30 +26,31 @@ public class PingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public PingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + public PingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(PingParameters parameters) { - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override From 688b4eeb3291f433829f3c6ba5cc5344f63ee85f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:41:20 +0100 Subject: [PATCH 15/17] Update SDK version to 3.3.0 Upgrade the agrirouter SDK version from 3.2.2 to 3.3.0 across all modules. This includes changes in the parent POM and specific module POMs for tests, convenience, implementation, and API. The update ensures consistency and leverages the new features and fixes in version 3.3.0. --- agrirouter-sdk-java-api/pom.xml | 2 +- agrirouter-sdk-java-convenience/pom.xml | 2 +- agrirouter-sdk-java-impl/pom.xml | 2 +- agrirouter-sdk-java-tests/pom.xml | 2 +- pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/agrirouter-sdk-java-api/pom.xml b/agrirouter-sdk-java-api/pom.xml index 3ba9cef1..ea08cf50 100644 --- a/agrirouter-sdk-java-api/pom.xml +++ b/agrirouter-sdk-java-api/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - API agrirouter-sdk-java-api diff --git a/agrirouter-sdk-java-convenience/pom.xml b/agrirouter-sdk-java-convenience/pom.xml index a2612b2c..c4f68e20 100644 --- a/agrirouter-sdk-java-convenience/pom.xml +++ b/agrirouter-sdk-java-convenience/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - CONVENIENCE agrirouter-sdk-java-convenience diff --git a/agrirouter-sdk-java-impl/pom.xml b/agrirouter-sdk-java-impl/pom.xml index 9b0b9434..31eccbfa 100644 --- a/agrirouter-sdk-java-impl/pom.xml +++ b/agrirouter-sdk-java-impl/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - IMPL agrirouter-sdk-java-impl diff --git a/agrirouter-sdk-java-tests/pom.xml b/agrirouter-sdk-java-tests/pom.xml index 3f952d3f..5bed6c13 100644 --- a/agrirouter-sdk-java-tests/pom.xml +++ b/agrirouter-sdk-java-tests/pom.xml @@ -5,7 +5,7 @@ agrirouter-sdk-java com.agrirouter.api - 3.2.2 + 3.3.0 4.0.0 AGRIROUTER SDK JAVA - TESTS diff --git a/pom.xml b/pom.xml index 30f015a0..56af2e28 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 pom AGRIROUTER SDK JAVA From 8a8a1d15da51e49c7eadd96957e6b049a27b4583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:42:26 +0100 Subject: [PATCH 16/17] Add SuppressWarnings annotation to constructor Added a @SuppressWarnings("unused") annotation to the PingServiceImpl constructor that accepts an Mqtt3AsyncClient parameter. This change addresses potential warnings about unused parameters, enhancing code cleanliness. --- .../dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java index 0307c753..a36f8485 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java @@ -30,6 +30,7 @@ public PingServiceImpl(IMqttClient mqttClient) { this.encodeMessageService = new EncodeMessageServiceImpl(); } + @SuppressWarnings("unused") public PingServiceImpl(Mqtt3AsyncClient mqttClient) { super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); From 7ea414100371cf138494b0dcb4e8ca6b653fb8b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20D=C3=B6mer?= Date: Fri, 1 Nov 2024 16:50:26 +0100 Subject: [PATCH 17/17] Update GitHub Actions to use latest versions Upgraded actions/checkout to v4.2.2 and actions/setup-java to v4.5.0 across all workflow steps. This ensures compatibility with the latest features and security enhancements. --- .github/workflows/continuous_integration.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/continuous_integration.yml b/.github/workflows/continuous_integration.yml index 061e5831..8d6fa674 100644 --- a/.github/workflows/continuous_integration.yml +++ b/.github/workflows/continuous_integration.yml @@ -12,9 +12,9 @@ jobs: build_jdk17: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 17 distribution: zulu @@ -25,9 +25,9 @@ jobs: build_jdk21: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 21 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 21 distribution: zulu @@ -38,9 +38,9 @@ jobs: integration_test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 17 distribution: zulu