diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageHeaderQueryService.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageHeaderQueryService.java index 1c1ffbdf..b71467a2 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageHeaderQueryService.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageHeaderQueryService.java @@ -1,9 +1,31 @@ package com.dke.data.agrirouter.api.service.messaging.http; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; public interface MessageHeaderQueryService extends MessagingService, - MessageDecoder {} + MessageDecoder { + + /** + * Query all message headers as default function. The query will be based on a time period since + * message ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + String queryAll(OnboardingResponse onboardingResponse); + + /** + * Query all message headers as async default function. The query will be based on a time period + * since message ID filtering or sender filtering can be achieved using the default message + * sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse); +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageQueryService.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageQueryService.java index 276a8fb6..33406d6f 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageQueryService.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/http/MessageQueryService.java @@ -1,9 +1,30 @@ package com.dke.data.agrirouter.api.service.messaging.http; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; public interface MessageQueryService extends MessagingService, - MessageDecoder {} + MessageDecoder { + + /** + * Query all messages as default function. The query will be based on a time period since message + * ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + String queryAll(OnboardingResponse onboardingResponse); + + /** + * Query all messages as async default function. The query will be based on a time period since + * message ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse); +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageHeaderQueryService.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageHeaderQueryService.java index da50148e..fb08d0e8 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageHeaderQueryService.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageHeaderQueryService.java @@ -1,9 +1,31 @@ package com.dke.data.agrirouter.api.service.messaging.mqtt; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; public interface MessageHeaderQueryService extends MessagingService, - MessageDecoder {} + MessageDecoder { + + /** + * Query all message headers as default function. The query will be based on a time period since + * message ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + String queryAll(OnboardingResponse onboardingResponse); + + /** + * Query all message headers as async default function. The query will be based on a time period + * since message ID filtering or sender filtering can be achieved using the default message + * sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse); +} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageQueryService.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageQueryService.java index e5875d33..56e52332 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageQueryService.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/mqtt/MessageQueryService.java @@ -1,9 +1,30 @@ package com.dke.data.agrirouter.api.service.messaging.mqtt; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; public interface MessageQueryService extends MessagingService, - MessageDecoder {} + MessageDecoder { + + /** + * Query all messages as default function. The query will be based on a time period since message + * ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + String queryAll(OnboardingResponse onboardingResponse); + + /** + * Query all messages as async default function. The query will be based on a time period since + * message ID filtering or sender filtering can be achieved using the default message sending. + * + * @param onboardingResponse The onboard response for the endpoint. + * @return The message ID. + */ + MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse); +} diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java index 27142c7e..14d50b9e 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java @@ -106,7 +106,9 @@ public List chunkAndEncode( messageHeaderParameters.validate(); payloadParameters.validate(); - if (messageHeaderParameters.getTechnicalMessageType().needsBase64EncodingAndHasToBeChunkedIfNecessary()) { + if (messageHeaderParameters + .getTechnicalMessageType() + .needsBase64EncodingAndHasToBeChunkedIfNecessary()) { if (payloadParameters.shouldBeChunked()) { getNativeLogger() .debug( diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/MessageQueryHelperService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/MessageQueryHelperService.java index 9195c065..9858fdf6 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/MessageQueryHelperService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/MessageQueryHelperService.java @@ -15,7 +15,7 @@ import java.util.concurrent.CompletableFuture; public class MessageQueryHelperService extends NonEnvironmentalService - implements MessageSender, MessageEncoder, ResponseValidator { + implements MessageSender, MessageEncoder, ResponseValidator, QueryAllMessagesParameterCreator { private final EncodeMessageService encodeMessageService; private final TechnicalMessageType technicalMessageType; diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/QueryAllMessagesParameterCreator.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/QueryAllMessagesParameterCreator.java new file mode 100644 index 00000000..19aeadd0 --- /dev/null +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/QueryAllMessagesParameterCreator.java @@ -0,0 +1,30 @@ +package com.dke.data.agrirouter.impl.messaging.helper; + +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters; +import com.dke.data.agrirouter.impl.common.UtcTimeService; +import java.util.Collections; +import org.jetbrains.annotations.NotNull; + +/** Interface to abstract the message query creation. */ +public interface QueryAllMessagesParameterCreator { + + /** + * Create message parameters to query all messages. + * + * @param onboardingResponse - + * @return - + */ + @NotNull + default MessageQueryParameters createMessageParametersToQueryAll( + OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); + messageQueryParameters.setOnboardingResponse(onboardingResponse); + messageQueryParameters.setMessageIds(Collections.emptyList()); + messageQueryParameters.setSenderIds(Collections.emptyList()); + messageQueryParameters.setSentFromInSeconds( + UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); + messageQueryParameters.setSentToInSeconds(UtcTimeService.now().toEpochSecond()); + return messageQueryParameters; + } +} diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java index 846e50da..b9004a60 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java @@ -8,6 +8,7 @@ import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters; import com.dke.data.agrirouter.impl.messaging.MessageEncoder; import com.dke.data.agrirouter.impl.messaging.MqttService; +import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator; import com.dke.data.agrirouter.impl.messaging.rest.MessageSender; import java.util.Collections; import java.util.Objects; @@ -16,7 +17,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; public class MessageQueryHelperService extends MqttService - implements MessageSender, MessageEncoder { + implements MessageSender, MessageEncoder, QueryAllMessagesParameterCreator { private final EncodeMessageService encodeMessageService; private final TechnicalMessageType technicalMessageType; diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java index 2c9f3d2e..70c5176d 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageHeaderQueryServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageHeaderQueryService; @@ -42,4 +43,15 @@ public FeedResponse.HeaderQueryResponse unsafeDecode(ByteString message) throws InvalidProtocolBufferException { return FeedResponse.HeaderQueryResponse.parseFrom(message); } + + @Override + public String queryAll(OnboardingResponse onboardingResponse) { + return send(messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse)); + } + + @Override + public MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) { + return sendAsync( + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse)); + } } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java index 4cceca5a..e28ca58b 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/MessageQueryServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.mqtt; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult; import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder; @@ -45,4 +46,18 @@ public FeedResponse.MessageQueryResponse unsafeDecode(ByteString message) throws InvalidProtocolBufferException { return FeedResponse.MessageQueryResponse.parseFrom(message); } + + @Override + public String queryAll(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return send(messageQueryParameters); + } + + @Override + public MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return sendAsync(messageQueryParameters); + } } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageHeaderQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageHeaderQueryServiceImpl.java index e859af88..91cdfd0f 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageHeaderQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageHeaderQueryServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.rest; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.env.Environment; import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult; @@ -39,4 +40,18 @@ public FeedResponse.HeaderQueryResponse unsafeDecode(ByteString message) throws InvalidProtocolBufferException { return FeedResponse.HeaderQueryResponse.parseFrom(message); } + + @Override + public String queryAll(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return send(messageQueryParameters); + } + + @Override + public HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return sendAsync(messageQueryParameters); + } } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageQueryServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageQueryServiceImpl.java index 712b93c1..e36c29e2 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageQueryServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/rest/MessageQueryServiceImpl.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.rest; import agrirouter.feed.response.FeedResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; import com.dke.data.agrirouter.api.enums.SystemMessageType; import com.dke.data.agrirouter.api.env.Environment; import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult; @@ -42,4 +43,18 @@ public FeedResponse.MessageQueryResponse unsafeDecode(ByteString message) throws InvalidProtocolBufferException { return FeedResponse.MessageQueryResponse.parseFrom(message); } + + @Override + public String queryAll(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return send(messageQueryParameters); + } + + @Override + public HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) { + MessageQueryParameters messageQueryParameters = + messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse); + return sendAsync(messageQueryParameters); + } } diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java index 1a45554b..fa77daae 100644 --- a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java @@ -5,32 +5,29 @@ import java.nio.file.Paths; import java.util.Base64; -/** - * Generic content reader for the testcases. - */ +/** Generic content reader for the testcases. */ public class ContentReader { - private static final String FOLDER = "./message-content/"; + private static final String FOLDER = "./message-content/"; - public static String readBase64EncodedMessageContent(Identifier identifier) throws Throwable { - Path path = Paths.get(FOLDER.concat(identifier.getFileName())); - final byte[] rawData = Files.readAllBytes(path); - return new String(Base64.getEncoder().encode(rawData)); - } - - public enum Identifier { - BIG_TASK_DATA("big_taskdata.zip"), - SMALL_TASK_DATA("small_taskdata.zip"); + public static String readBase64EncodedMessageContent(Identifier identifier) throws Throwable { + Path path = Paths.get(FOLDER.concat(identifier.getFileName())); + final byte[] rawData = Files.readAllBytes(path); + return new String(Base64.getEncoder().encode(rawData)); + } - private final String fileName; + public enum Identifier { + BIG_TASK_DATA("big_taskdata.zip"), + SMALL_TASK_DATA("small_taskdata.zip"); - Identifier(String fileName) { - this.fileName = fileName; - } + private final String fileName; - public String getFileName() { - return fileName; - } + Identifier(String fileName) { + this.fileName = fileName; } + public String getFileName() { + return fileName; + } + } } diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java index 517b2cea..4443a31d 100644 --- a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java @@ -25,6 +25,11 @@ import com.dke.data.agrirouter.test.OnboardingResponseRepository; import com.dke.data.agrirouter.test.helper.ContentReader; import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.http.HttpStatus; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -32,248 +37,287 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Test case to show the behavior for chunked message sending. - */ +/** Test case to show the behavior for chunked message sending. */ class SendAndReceiveChunkedMessagesTest extends AbstractIntegrationTest { - private static final int MAX_CHUNK_SIZE = 1024000; - - @ParameterizedTest - @MethodSource("getTestArguments") - void givenRealMessageContentWhenSendingMessagesTheContentShouldMatchAfterReceivingAndMergingIt( - ByteString messageContent, int expectedNrOfChunks) - throws Throwable { - actionsForSender(messageContent, expectedNrOfChunks); - actionsForTheRecipient(messageContent, expectedNrOfChunks); - } - - /** - * These are the actions for the recipient. The recipient is already set up and declared the capabilities. The actions for the recipient are documented inline. - * - * @param messageContent - - * @param expectedNrOfChunks - - */ - private void actionsForTheRecipient(ByteString messageContent, int expectedNrOfChunks) throws Throwable { - // [1] Fetch all the messages within the feed. The number of headers should match the number of chunks sent. - final MessageQueryServiceImpl messageQueryService = new MessageQueryServiceImpl(new QA() { - }); - final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); - final OnboardingResponse recipient = OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); - messageQueryParameters.setOnboardingResponse(recipient); - messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); - messageQueryParameters.setSentFromInSeconds(UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); - messageQueryService.send(messageQueryParameters); - - // [2] Wait for the agrirouter to process the message. - waitForTheAgrirouterToProcessSingleMessage(); - - // [3] Fetch the chunks from the outbox. Since we have the same restrictions while receiving, this has to be the same number of messages as it is chunks. - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - Optional> fetchMessageResponses = - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); - - // [4] Check if the response from the AR only contains valid results. - final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - fetchMessageResponses.get() - .stream() - .map(fetchMessageResponse -> decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) - .forEach(decodeMessageResponse -> Assertions.assertEquals(Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_MESSAGE, decodeMessageResponse.getResponseEnvelope().getType())); - - // [5] Map the results from the query to 'real' messages within the feed and perform some assertions. - final List feedMessages = fetchMessageResponses.get() - .stream() - .map(fetchMessageResponse -> decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) - .map(decodeMessageResponse -> messageQueryService.decode(decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue())) - .map(messageQueryResponse -> messageQueryResponse.getMessages(0)) - .collect(Collectors.toList()); - Assertions.assertEquals(expectedNrOfChunks, feedMessages.size()); - feedMessages.forEach(feedMessage -> Assertions.assertNotNull(feedMessage.getHeader().getChunkContext())); - Assertions.assertEquals(feedMessages.get(0).getHeader().getChunkContext().getTotal(), expectedNrOfChunks); - final Set chunkContextIds = feedMessages.stream().map(feedMessage -> feedMessage.getHeader().getChunkContext().getContextId()).collect(Collectors.toSet()); - Assertions.assertEquals(1, chunkContextIds.size(), "There should be only one chunk context ID."); - - // [6] Combine the chunks in order of their numbers and check if the result is fine. - feedMessages.sort(Comparator.comparingLong(o -> o.getHeader().getChunkContext().getCurrent())); - final String base64EncodedMessageContent = feedMessages - .stream() - // Fetch the message content for each chunk. - .map(feedMessage -> feedMessage.getContent().getValue().toStringUtf8()) - // Decode each of the chunks to get concat the original value. - .map(base64EncodedContent -> new String(Base64.getDecoder().decode(base64EncodedContent))) - // Concat the chunks. - .collect(Collectors.joining("")); - Assertions.assertEquals(messageContent.toStringUtf8(), base64EncodedMessageContent); - - // [7] Confirm the chunks to remove them from the feed. - final List messageIdsToConfirm = feedMessages - .stream() - .map(feedMessage -> feedMessage.getHeader().getMessageId()) - .collect(Collectors.toList()); - final MessageConfirmationServiceImpl messageConfirmationService = new MessageConfirmationServiceImpl(new QA() { - }); - final MessageConfirmationParameters messageConfirmationParameters = new MessageConfirmationParameters(); - messageConfirmationParameters.setOnboardingResponse(recipient); - messageConfirmationParameters.setMessageIds(messageIdsToConfirm); - messageConfirmationService.send(messageConfirmationParameters); - - // [8] Fetch the response from the agrirouter after confirming the messages. - waitForTheAgrirouterToProcessSingleMessage(); - fetchMessageResponses = - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(1, fetchMessageResponses.get().size(), "This should be a single response."); - } - - /** - * These are the actions for the sender. The sender is already set up and declared the capabilities. The actions for the sender are documented inline. - * - * @param messageContent - - * @param expectedNrOfChunks - - * @throws IOException - - * @throws InterruptedException - - */ - private void actionsForSender(ByteString messageContent, int expectedNrOfChunks) throws IOException, InterruptedException { - final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); - final SendMessageServiceImpl sendMessageService = new SendMessageServiceImpl(); - final OnboardingResponse onboardingResponse = - OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.FARMING_SOFTWARE); - - // [1] Define the raw message, in this case this is the Base64 encoded message content, no chunking needed. - MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters(); - messageHeaderParameters.setTechnicalMessageType(ContentMessageType.ISO_11783_TASKDATA_ZIP); - messageHeaderParameters.setApplicationMessageId(MessageIdService.generateMessageId()); - messageHeaderParameters.setApplicationMessageSeqNo( - SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse)); - messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT); - messageHeaderParameters.setRecipients( - Collections.singletonList( - OnboardingResponseRepository.read( - OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT) - .getSensorAlternateId())); - - PayloadParameters payloadParameters = new PayloadParameters(); - payloadParameters.setValue(messageContent); - payloadParameters.setTypeUrl(SystemMessageType.EMPTY.getKey()); - - // [2] Chunk the message content using the SDK specific methods ('chunkAndEncode'). - List tuples = - encodeMessageService.chunkAndEncode( - messageHeaderParameters, payloadParameters, onboardingResponse); - - tuples.forEach( - messageParameterTuple -> - Assertions.assertTrue( - Objects.requireNonNull(messageParameterTuple.getPayloadParameters().getValue()) - .toStringUtf8() - .length() - <= MAX_CHUNK_SIZE)); - - List encodedMessages = encodeMessageService.encode(tuples); - - // [3] Send the chunks to the agrirouter. - SendMessageParameters sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setEncodedMessages(encodedMessages); - sendMessageParameters.setOnboardingResponse(onboardingResponse); - sendMessageService.send(sendMessageParameters); - - // [4] Wait for the AR to process the chunks. - waitForTheAgrirouterToProcessMultipleMessages(); - - // [5] Check if the chunks were processed successfully. - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - Optional> fetchMessageResponses = - fetchMessageService.fetch( - onboardingResponse, - new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); - - DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - AtomicReference decodeMessageResponse = new AtomicReference<>(); + private static final int MAX_CHUNK_SIZE = 1024000; + + @ParameterizedTest + @MethodSource("getTestArguments") + void givenRealMessageContentWhenSendingMessagesTheContentShouldMatchAfterReceivingAndMergingIt( + ByteString messageContent, int expectedNrOfChunks) throws Throwable { + actionsForSender(messageContent, expectedNrOfChunks); + actionsForTheRecipient(messageContent, expectedNrOfChunks); + } + + /** + * These are the actions for the recipient. The recipient is already set up and declared the + * capabilities. The actions for the recipient are documented inline. + * + * @param messageContent - + * @param expectedNrOfChunks - + */ + private void actionsForTheRecipient(ByteString messageContent, int expectedNrOfChunks) + throws Throwable { + // [1] Fetch all the messages within the feed. The number of headers should match the number of + // chunks sent. + final MessageQueryServiceImpl messageQueryService = new MessageQueryServiceImpl(new QA() {}); + final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); + final OnboardingResponse recipient = + OnboardingResponseRepository.read( + OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); + messageQueryParameters.setOnboardingResponse(recipient); + messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); + messageQueryParameters.setSentFromInSeconds( + UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); + messageQueryService.send(messageQueryParameters); + + // [2] Wait for the agrirouter to process the message. + waitForTheAgrirouterToProcessSingleMessage(); + + // [3] Fetch the chunks from the outbox. Since we have the same restrictions while receiving, + // this has to be the same number of messages as it is chunks. + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + Optional> fetchMessageResponses = + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); + + // [4] Check if the response from the AR only contains valid results. + final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + fetchMessageResponses.get().stream() + .map( + fetchMessageResponse -> + decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) + .forEach( + decodeMessageResponse -> + Assertions.assertEquals( + Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_MESSAGE, + decodeMessageResponse.getResponseEnvelope().getType())); + + // [5] Map the results from the query to 'real' messages within the feed and perform some + // assertions. + final List feedMessages = fetchMessageResponses.get().stream() - .map(FetchMessageResponse::getCommand) - .forEach( - message -> { - Assertions.assertNotNull(message); - decodeMessageResponse.set(decodeMessageService.decode(message.getMessage())); - - Assertions.assertMatchesAny( - Arrays.asList(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT), - decodeMessageResponse.get().getResponseEnvelope().getResponseCode()); - }); - } - - /** - * Delivers fake message content for multiple test cases. - * - * @return - - */ - private static Stream getTestArguments() throws Throwable { - return Stream.of( - Arguments.of( - ByteString.copyFromUtf8(ContentReader.readBase64EncodedMessageContent(ContentReader.Identifier.BIG_TASK_DATA)), - 4)); - } - - /** - * Cleanup before and after each test case. These actions are necessary because it could be the case, that there are dangling messages from former tests. - */ - @BeforeEach - @AfterEach - public void prepareTestEnvironment() throws Throwable { - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - final OnboardingResponse recipient = OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); - final MessageHeaderQueryServiceImpl messageHeaderQueryService = new MessageHeaderQueryServiceImpl(new QA() { - }); - - // [1] Clean the outbox of the endpoint. - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - - // [2] Fetch all message headers for the last 4 weeks (maximum retention time within the agrirouter). - final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); - messageQueryParameters.setOnboardingResponse(recipient); - messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); - messageQueryParameters.setSentFromInSeconds(UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); - messageHeaderQueryService.send(messageQueryParameters); - waitForTheAgrirouterToProcessSingleMessage(); - Optional> fetchMessageResponses = - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(1, fetchMessageResponses.get().size(), "This should be a single response."); - final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - final DecodeMessageResponse decodeMessageResponse = decodeMessageService.decode(fetchMessageResponses.get().get(0).getCommand().getMessage()); - Assertions.assertEquals(Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_HEADER_LIST, decodeMessageResponse.getResponseEnvelope().getType()); - final FeedResponse.HeaderQueryResponse headerQueryResponse = messageHeaderQueryService.decode(decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue()); - - // [3] Delete the dangling messages from the feed of the endpoint if necessary. - if (headerQueryResponse.getQueryMetrics().getTotalMessagesInQuery() > 0) { - final DeleteMessageServiceImpl deleteMessageService = new DeleteMessageServiceImpl(); - final DeleteMessageParameters deleteMessageParameters = new DeleteMessageParameters(); - deleteMessageParameters.setOnboardingResponse(recipient); - final List messageIds = headerQueryResponse.getFeedList() - .stream() - .map(FeedResponse.HeaderQueryResponse.Feed::getHeadersList) - .flatMap(Collection::stream) - .map(FeedResponse.HeaderQueryResponse.Header::getMessageId) - .collect(Collectors.toList()); - deleteMessageParameters.setMessageIds(messageIds); - deleteMessageService.send(deleteMessageParameters); - waitForTheAgrirouterToProcessSingleMessage(); - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - } - - // [4] Clean the outbox of the endpoint. - fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + .map( + fetchMessageResponse -> + decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) + .map( + decodeMessageResponse -> + messageQueryService.decode( + decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue())) + .map(messageQueryResponse -> messageQueryResponse.getMessages(0)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedNrOfChunks, feedMessages.size()); + feedMessages.forEach( + feedMessage -> Assertions.assertNotNull(feedMessage.getHeader().getChunkContext())); + Assertions.assertEquals( + feedMessages.get(0).getHeader().getChunkContext().getTotal(), expectedNrOfChunks); + final Set chunkContextIds = + feedMessages.stream() + .map(feedMessage -> feedMessage.getHeader().getChunkContext().getContextId()) + .collect(Collectors.toSet()); + Assertions.assertEquals( + 1, chunkContextIds.size(), "There should be only one chunk context ID."); + + // [6] Combine the chunks in order of their numbers and check if the result is fine. + feedMessages.sort(Comparator.comparingLong(o -> o.getHeader().getChunkContext().getCurrent())); + final String base64EncodedMessageContent = + feedMessages.stream() + // Fetch the message content for each chunk. + .map(feedMessage -> feedMessage.getContent().getValue().toStringUtf8()) + // Decode each of the chunks to get concat the original value. + .map( + base64EncodedContent -> + new String(Base64.getDecoder().decode(base64EncodedContent))) + // Concat the chunks. + .collect(Collectors.joining("")); + Assertions.assertEquals(messageContent.toStringUtf8(), base64EncodedMessageContent); + + // [7] Confirm the chunks to remove them from the feed. + final List messageIdsToConfirm = + feedMessages.stream() + .map(feedMessage -> feedMessage.getHeader().getMessageId()) + .collect(Collectors.toList()); + final MessageConfirmationServiceImpl messageConfirmationService = + new MessageConfirmationServiceImpl(new QA() {}); + final MessageConfirmationParameters messageConfirmationParameters = + new MessageConfirmationParameters(); + messageConfirmationParameters.setOnboardingResponse(recipient); + messageConfirmationParameters.setMessageIds(messageIdsToConfirm); + messageConfirmationService.send(messageConfirmationParameters); + + // [8] Fetch the response from the agrirouter after confirming the messages. + waitForTheAgrirouterToProcessSingleMessage(); + fetchMessageResponses = + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals( + 1, fetchMessageResponses.get().size(), "This should be a single response."); + } + + /** + * These are the actions for the sender. The sender is already set up and declared the + * capabilities. The actions for the sender are documented inline. + * + * @param messageContent - + * @param expectedNrOfChunks - + * @throws IOException - + * @throws InterruptedException - + */ + private void actionsForSender(ByteString messageContent, int expectedNrOfChunks) + throws IOException, InterruptedException { + final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); + final SendMessageServiceImpl sendMessageService = new SendMessageServiceImpl(); + final OnboardingResponse onboardingResponse = + OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.FARMING_SOFTWARE); + + // [1] Define the raw message, in this case this is the Base64 encoded message content, no + // chunking needed. + MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters(); + messageHeaderParameters.setTechnicalMessageType(ContentMessageType.ISO_11783_TASKDATA_ZIP); + messageHeaderParameters.setApplicationMessageId(MessageIdService.generateMessageId()); + messageHeaderParameters.setApplicationMessageSeqNo( + SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse)); + messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT); + messageHeaderParameters.setRecipients( + Collections.singletonList( + OnboardingResponseRepository.read( + OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT) + .getSensorAlternateId())); + + PayloadParameters payloadParameters = new PayloadParameters(); + payloadParameters.setValue(messageContent); + payloadParameters.setTypeUrl(SystemMessageType.EMPTY.getKey()); + + // [2] Chunk the message content using the SDK specific methods ('chunkAndEncode'). + List tuples = + encodeMessageService.chunkAndEncode( + messageHeaderParameters, payloadParameters, onboardingResponse); + + tuples.forEach( + messageParameterTuple -> + Assertions.assertTrue( + Objects.requireNonNull(messageParameterTuple.getPayloadParameters().getValue()) + .toStringUtf8() + .length() + <= MAX_CHUNK_SIZE)); + + List encodedMessages = encodeMessageService.encode(tuples); + + // [3] Send the chunks to the agrirouter. + SendMessageParameters sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setEncodedMessages(encodedMessages); + sendMessageParameters.setOnboardingResponse(onboardingResponse); + sendMessageService.send(sendMessageParameters); + + // [4] Wait for the AR to process the chunks. + waitForTheAgrirouterToProcessMultipleMessages(); + + // [5] Check if the chunks were processed successfully. + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + Optional> fetchMessageResponses = + fetchMessageService.fetch( + onboardingResponse, + new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); + + DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + AtomicReference decodeMessageResponse = new AtomicReference<>(); + fetchMessageResponses.get().stream() + .map(FetchMessageResponse::getCommand) + .forEach( + message -> { + Assertions.assertNotNull(message); + decodeMessageResponse.set(decodeMessageService.decode(message.getMessage())); + + Assertions.assertMatchesAny( + Arrays.asList(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT), + decodeMessageResponse.get().getResponseEnvelope().getResponseCode()); + }); + } + + /** + * Delivers fake message content for multiple test cases. + * + * @return - + */ + private static Stream getTestArguments() throws Throwable { + return Stream.of( + Arguments.of( + ByteString.copyFromUtf8( + ContentReader.readBase64EncodedMessageContent( + ContentReader.Identifier.BIG_TASK_DATA)), + 4)); + } + + /** + * Cleanup before and after each test case. These actions are necessary because it could be the + * case, that there are dangling messages from former tests. + */ + @BeforeEach + @AfterEach + public void prepareTestEnvironment() throws Throwable { + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + final OnboardingResponse recipient = + OnboardingResponseRepository.read( + OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); + final MessageHeaderQueryServiceImpl messageHeaderQueryService = + new MessageHeaderQueryServiceImpl(new QA() {}); + + // [1] Clean the outbox of the endpoint. + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + + // [2] Fetch all message headers for the last 4 weeks (maximum retention time within the + // agrirouter). + final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); + messageQueryParameters.setOnboardingResponse(recipient); + messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); + messageQueryParameters.setSentFromInSeconds( + UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); + messageHeaderQueryService.send(messageQueryParameters); + waitForTheAgrirouterToProcessSingleMessage(); + Optional> fetchMessageResponses = + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals( + 1, fetchMessageResponses.get().size(), "This should be a single response."); + final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + final DecodeMessageResponse decodeMessageResponse = + decodeMessageService.decode(fetchMessageResponses.get().get(0).getCommand().getMessage()); + Assertions.assertEquals( + Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_HEADER_LIST, + decodeMessageResponse.getResponseEnvelope().getType()); + final FeedResponse.HeaderQueryResponse headerQueryResponse = + messageHeaderQueryService.decode( + decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue()); + + // [3] Delete the dangling messages from the feed of the endpoint if necessary. + if (headerQueryResponse.getQueryMetrics().getTotalMessagesInQuery() > 0) { + final DeleteMessageServiceImpl deleteMessageService = new DeleteMessageServiceImpl(); + final DeleteMessageParameters deleteMessageParameters = new DeleteMessageParameters(); + deleteMessageParameters.setOnboardingResponse(recipient); + final List messageIds = + headerQueryResponse.getFeedList().stream() + .map(FeedResponse.HeaderQueryResponse.Feed::getHeadersList) + .flatMap(Collection::stream) + .map(FeedResponse.HeaderQueryResponse.Header::getMessageId) + .collect(Collectors.toList()); + deleteMessageParameters.setMessageIds(messageIds); + deleteMessageService.send(deleteMessageParameters); + waitForTheAgrirouterToProcessSingleMessage(); + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); } + // [4] Clean the outbox of the endpoint. + fetchMessageService.fetch( + recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + } }