diff --git a/.github/workflows/continuous_integration.yml b/.github/workflows/continuous_integration.yml index 061e5831..8d6fa674 100644 --- a/.github/workflows/continuous_integration.yml +++ b/.github/workflows/continuous_integration.yml @@ -12,9 +12,9 @@ jobs: build_jdk17: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 17 distribution: zulu @@ -25,9 +25,9 @@ jobs: build_jdk21: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 21 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 21 distribution: zulu @@ -38,9 +38,9 @@ jobs: integration_test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4.2.2 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4.5.0 with: java-version: 17 distribution: zulu diff --git a/.gitignore b/.gitignore index b4a56a0b..0bdafc50 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,8 @@ buildNumber.properties # GENERATED # src/main/generated +# LOCAL SETTINGS # +ci/local-settings.xml + # LCK # *.lck \ No newline at end of file diff --git a/agrirouter-sdk-java-api/pom.xml b/agrirouter-sdk-java-api/pom.xml index f365729a..ea08cf50 100644 --- a/agrirouter-sdk-java-api/pom.xml +++ b/agrirouter-sdk-java-api/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - API agrirouter-sdk-java-api @@ -48,6 +48,14 @@ org.apache.commons commons-lang3 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + com.hivemq + hivemq-mqtt-client + diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java new file mode 100644 index 00000000..c499dee1 --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/HiveMqttClientWrapper.java @@ -0,0 +1,22 @@ +package com.dke.data.agrirouter.api.mqtt; + +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; + +/** + * A wrapper around HiveMQ's Mqtt3AsyncClient that implements the MqttClientWrapper interface. + * This class is responsible for publishing messages to a specified MQTT topic using the + * provided Mqtt3AsyncClient. + */ +public class HiveMqttClientWrapper implements MqttClientWrapper { + + private final Mqtt3AsyncClient mqttClient; + + public HiveMqttClientWrapper(Mqtt3AsyncClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void publish(String measures, byte[] payload) { + this.mqttClient.publishWith().topic(measures).payload(payload).send(); + } +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java new file mode 100644 index 00000000..3948911b --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/MqttClientWrapper.java @@ -0,0 +1,17 @@ +package com.dke.data.agrirouter.api.mqtt; + +/** + * Interface representing an MQTT client wrapper for publishing messages. + * Implementations of this interface should define how to publish messages + * to a specific topic with a given payload. + */ +public interface MqttClientWrapper { + + /** + * Publishes a message to a specific topic with a given payload. + * + * @param measures the topic to which the message should be published + * @param payload the byte array representing the content of the message + */ + void publish(String measures, byte[] payload); +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java new file mode 100644 index 00000000..0fdc606f --- /dev/null +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/mqtt/PahoMqttClientWrapper.java @@ -0,0 +1,30 @@ +package com.dke.data.agrirouter.api.mqtt; + +import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * PahoMqttClientWrapper is an implementation of the MqttClientWrapper interface, + * providing a wrapper around an instance of IMqttClient from the Paho MQTT library. + * This class handles the publishing of MQTT messages to specified topics with a given payload. + */ +public class PahoMqttClientWrapper implements MqttClientWrapper { + + private final IMqttClient mqttClient; + + public PahoMqttClientWrapper(IMqttClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void publish(String measures, byte[] payload) { + try { + this.mqttClient.publish(measures, new MqttMessage(payload)); + } catch (MqttException e) { + throw new CouldNotSendMqttMessageException(e); + } + } + +} diff --git a/agrirouter-sdk-java-convenience/pom.xml b/agrirouter-sdk-java-convenience/pom.xml index a2612b2c..c4f68e20 100644 --- a/agrirouter-sdk-java-convenience/pom.xml +++ b/agrirouter-sdk-java-convenience/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - CONVENIENCE agrirouter-sdk-java-convenience diff --git a/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java new file mode 100644 index 00000000..a3c2378d --- /dev/null +++ b/agrirouter-sdk-java-convenience/src/main/java/com/dke/data/agrirouter/convenience/mqtt/hive/HiveMqttClientService.java @@ -0,0 +1,71 @@ +package com.dke.data.agrirouter.convenience.mqtt.hive; + +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.dto.onboard.RouterDevice; +import com.dke.data.agrirouter.api.env.Environment; +import com.dke.data.agrirouter.api.exception.CouldNotCreateMqttClientException; +import com.dke.data.agrirouter.impl.EnvironmentalService; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import org.apache.commons.lang3.StringUtils; + +import java.util.Objects; + +/** + * Service to create a MQTT client using the given onboarding response. + */ +@SuppressWarnings("unused") +public class HiveMqttClientService extends EnvironmentalService { + + /** + * Constructor for an environmental service. + * + * @param environment - + */ + public HiveMqttClientService(Environment environment) { + super(environment); + } + + /** + * Creates a MQTT client using the given onboarding response. Communication relies on given root + * certificates in an external keystore. The keystore with the root certificates is not created + * locally. + * + * @param onboardingResponse - + * @return - + */ + public Mqtt3AsyncClient create(OnboardingResponse onboardingResponse) { + return this.createMqttClient( + onboardingResponse.getConnectionCriteria().getHost(), + onboardingResponse.getConnectionCriteria().getPort(), + onboardingResponse.getConnectionCriteria().getClientId()); + } + + /** + * Creates a MQTT client using the given router Device. Communication relies on given root + * certificates in an external keystore. The keystore with the root certificates is not created + * locally. + * + * @param routerDevice - + * @return - + */ + public Mqtt3AsyncClient create(RouterDevice routerDevice) { + return this.createMqttClient( + routerDevice.getConnectionCriteria().getHost(), + String.valueOf(routerDevice.getConnectionCriteria().getPort()), + routerDevice.getConnectionCriteria().getClientId()); + } + + private Mqtt3AsyncClient createMqttClient(String host, String port, String clientId) { + if (StringUtils.isAnyBlank(host, port, clientId)) { + throw new CouldNotCreateMqttClientException( + "Currently there are parameters missing. Did you onboard correctly - host, port or client id are missing."); + } + return MqttClient.builder() + .useMqttVersion3() + .identifier(Objects.requireNonNull(clientId)) + .serverHost(host) + .serverPort(Integer.parseInt(port)) + .buildAsync(); + } +} diff --git a/agrirouter-sdk-java-impl/pom.xml b/agrirouter-sdk-java-impl/pom.xml index 9b0b9434..31eccbfa 100644 --- a/agrirouter-sdk-java-impl/pom.xml +++ b/agrirouter-sdk-java-impl/pom.xml @@ -6,7 +6,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 AGRIROUTER SDK JAVA - IMPL agrirouter-sdk-java-impl diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java index 1a4bb7f7..d508b8f1 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/MqttService.java @@ -1,19 +1,19 @@ package com.dke.data.agrirouter.impl.messaging; -import org.eclipse.paho.client.mqttv3.IMqttClient; +import com.dke.data.agrirouter.api.mqtt.MqttClientWrapper; /** * Base class which holds the MQTT client with the connection provided by the provider. */ public class MqttService { - private final IMqttClient mqttClient; + private final MqttClientWrapper mqttClientWrapper; - public MqttService(IMqttClient mqttClient) { - this.mqttClient = mqttClient; + public MqttService(MqttClientWrapper mqttClientWrapper) { + this.mqttClientWrapper = mqttClientWrapper; } - protected IMqttClient getMqttClient() { - return mqttClient; + protected MqttClientWrapper getMqttClient() { + return mqttClientWrapper; } } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java index fef5faaf..775591f9 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.helper.mqtt; import com.dke.data.agrirouter.api.enums.TechnicalMessageType; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -9,9 +10,8 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,11 +26,18 @@ public MessageQueryHelperService( IMqttClient mqttClient, EncodeMessageService encodeMessageService, TechnicalMessageType technicalMessageType) { - super(mqttClient); - this.logMethodBegin(); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = encodeMessageService; + this.technicalMessageType = technicalMessageType; + } + + public MessageQueryHelperService( + Mqtt3AsyncClient mqttClient, + EncodeMessageService encodeMessageService, + TechnicalMessageType technicalMessageType) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = encodeMessageService; this.technicalMessageType = technicalMessageType; - this.logMethodEnd(); } public String send(MessageQueryParameters parameters) { @@ -38,29 +45,25 @@ public String send(MessageQueryParameters parameters) { this.getNativeLogger().trace("Validate parameters."); parameters.validate(); - try { - this.getNativeLogger().trace("Encode message."); - var encodedMessage = this.encode(this.technicalMessageType, parameters); + this.getNativeLogger().trace("Encode message."); + var encodedMessage = this.encode(this.technicalMessageType, parameters); - this.getNativeLogger().trace("Build message parameters."); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); + this.getNativeLogger().trace("Build message parameters."); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); - this.getNativeLogger().trace("Send and fetch message response."); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + this.getNativeLogger().trace("Send and fetch message response."); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java index a3c8ca19..caedaaa7 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOffboardingService; import com.dke.data.agrirouter.api.service.parameters.CloudOffboardingParameters; @@ -10,9 +11,8 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -28,36 +28,35 @@ public class CloudOffboardingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public CloudOffboardingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); } - /** + public CloudOffboardingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); + } + + /** * Offboarding a virtual CU. Will deliver no result if the action was successful, if there's any * error an exception will be thrown. * * @param parameters Parameters for offboarding. */ - @Override + @Override public String send(CloudOffboardingParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java index 96d2b897..2c6c9367 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOnboardingServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOnboardingService; import com.dke.data.agrirouter.api.service.parameters.CloudOnboardingParameters; @@ -10,9 +11,8 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,7 +25,11 @@ public class CloudOnboardingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public CloudOnboardingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + } + + public CloudOnboardingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); } /** @@ -37,24 +41,20 @@ public CloudOnboardingServiceImpl(IMqttClient mqttClient) { @Override public String send(CloudOnboardingParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java index eee3006c..43e23f60 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/DeleteMessageServiceImpl.java @@ -1,8 +1,9 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.DeleteMessageService; import com.dke.data.agrirouter.api.service.parameters.DeleteMessageParameters; @@ -12,9 +13,8 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.helper.DeleteAllMessagesParameterCreator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -30,30 +30,30 @@ public class DeleteMessageServiceImpl extends MqttService private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); public DeleteMessageServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + } + + public DeleteMessageServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); } @Override public String send(DeleteMessageParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java index 7172236b..e2991f31 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/ListEndpointsServiceImpl.java @@ -3,8 +3,9 @@ import agrirouter.request.payload.account.Endpoints; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.messaging.mqtt.ListEndpointsService; @@ -16,9 +17,8 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -34,31 +34,32 @@ public class ListEndpointsServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public ListEndpointsServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + public ListEndpointsServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(ListEndpointsParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java index 0dfda664..941813f4 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageConfirmationServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageConfirmationService; import com.dke.data.agrirouter.api.service.parameters.MessageConfirmationParameters; @@ -10,9 +11,8 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,31 +25,32 @@ public class MessageConfirmationServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public MessageConfirmationServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + public MessageConfirmationServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(MessageConfirmationParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java index 94d20b3e..1546ba34 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java @@ -4,6 +4,8 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageHeaderQueryService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; import com.dke.data.agrirouter.impl.messaging.MqttService; @@ -12,6 +14,7 @@ import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.concurrent.CompletableFuture; @@ -23,7 +26,14 @@ public class MessageHeaderQueryServiceImpl extends MqttService private final MessageQueryHelperService messageQueryHelperService; public MessageHeaderQueryServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + messageQueryHelperService = + new MessageQueryHelperService( + mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_HEADER_QUERY); + } + + public MessageHeaderQueryServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); messageQueryHelperService = new MessageQueryHelperService( mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_HEADER_QUERY); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java index dcea98b8..4d2667a1 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java @@ -4,6 +4,8 @@ import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageQueryService; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; @@ -13,6 +15,7 @@ import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; import java.util.concurrent.CompletableFuture; @@ -26,7 +29,14 @@ public class MessageQueryServiceImpl extends MqttService private final MessageQueryHelperService messageQueryHelperService; public MessageQueryServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.messageQueryHelperService = + new MessageQueryHelperService( + mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_MESSAGE_QUERY); + } + + public MessageQueryServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.messageQueryHelperService = new MessageQueryHelperService( mqttClient, new EncodeMessageServiceImpl(), SystemMessageType.DKE_FEED_MESSAGE_QUERY); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java index b20daf4b..a36f8485 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/PingServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.PingService; import com.dke.data.agrirouter.api.service.parameters.PingParameters; @@ -10,9 +11,8 @@ import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,30 +26,32 @@ public class PingServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public PingServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + @SuppressWarnings("unused") + public PingServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(PingParameters parameters) { - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java index 9999085a..b2344296 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SendMessageServiceImpl.java @@ -1,13 +1,13 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.SendMessageService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; import com.dke.data.agrirouter.impl.messaging.MessageBodyCreator; import com.dke.data.agrirouter.impl.messaging.MqttService; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -21,7 +21,11 @@ public class SendMessageServiceImpl extends MqttService implements SendMessageService, MessageBodyCreator { public SendMessageServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + } + + public SendMessageServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); } /** @@ -31,18 +35,14 @@ public SendMessageServiceImpl(IMqttClient mqttClient) { */ public void send(SendMessageParameters sendMessageParameters) { sendMessageParameters.validate(); - try { - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(sendMessageParameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(sendMessageParameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java index ec381a5e..647ab62f 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetCapabilityServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetCapabilityService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -11,9 +12,8 @@ import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import com.dke.data.agrirouter.impl.validation.ResponseValidator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -26,31 +26,32 @@ public class SetCapabilityServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public SetCapabilityServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + public SetCapabilityServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(SetCapabilitiesParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java index 6e882790..6ae91be7 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/SetSubscriptionServiceImpl.java @@ -1,7 +1,8 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; -import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; +import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper; +import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper; import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; import com.dke.data.agrirouter.api.service.messaging.mqtt.SetSubscriptionService; import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; @@ -11,9 +12,8 @@ import com.dke.data.agrirouter.impl.messaging.MqttService; import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; import com.dke.data.agrirouter.impl.validation.ResponseValidator; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Collections; import java.util.Objects; @@ -25,31 +25,32 @@ public class SetSubscriptionServiceImpl extends MqttService private final EncodeMessageService encodeMessageService; public SetSubscriptionServiceImpl(IMqttClient mqttClient) { - super(mqttClient); + super(new PahoMqttClientWrapper(mqttClient)); + this.encodeMessageService = new EncodeMessageServiceImpl(); + } + + public SetSubscriptionServiceImpl(Mqtt3AsyncClient mqttClient) { + super(new HiveMqttClientWrapper(mqttClient)); this.encodeMessageService = new EncodeMessageServiceImpl(); } @Override public String send(SetSubscriptionParameters parameters) { parameters.validate(); - try { - var encodedMessage = this.encode(parameters); - var sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); - sendMessageParameters.setEncodedMessages( - Collections.singletonList(encodedMessage.getEncodedMessage())); - var messageAsJson = this.createMessageBody(sendMessageParameters); - var payload = messageAsJson.getBytes(); - this.getMqttClient() - .publish( - Objects.requireNonNull(parameters.getOnboardingResponse()) - .getConnectionCriteria() - .getMeasures(), - new MqttMessage(payload)); - return encodedMessage.getApplicationMessageID(); - } catch (MqttException e) { - throw new CouldNotSendMqttMessageException(e); - } + var encodedMessage = this.encode(parameters); + var sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse()); + sendMessageParameters.setEncodedMessages( + Collections.singletonList(encodedMessage.getEncodedMessage())); + var messageAsJson = this.createMessageBody(sendMessageParameters); + var payload = messageAsJson.getBytes(); + this.getMqttClient() + .publish( + Objects.requireNonNull(parameters.getOnboardingResponse()) + .getConnectionCriteria() + .getMeasures(), + payload); + return encodedMessage.getApplicationMessageID(); } @Override diff --git a/agrirouter-sdk-java-tests/pom.xml b/agrirouter-sdk-java-tests/pom.xml index 3f952d3f..5bed6c13 100644 --- a/agrirouter-sdk-java-tests/pom.xml +++ b/agrirouter-sdk-java-tests/pom.xml @@ -5,7 +5,7 @@ agrirouter-sdk-java com.agrirouter.api - 3.2.2 + 3.3.0 4.0.0 AGRIROUTER SDK JAVA - TESTS diff --git a/pom.xml b/pom.xml index 7310dfc6..56af2e28 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.agrirouter.api agrirouter-sdk-java - 3.2.2 + 3.3.0 pom AGRIROUTER SDK JAVA @@ -214,6 +214,11 @@ org.eclipse.paho.client.mqttv3 1.2.5 + + com.hivemq + hivemq-mqtt-client + 1.3.3 +