diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/env/Environment.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/env/Environment.java index 997a2ff1..d6f5c9d5 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/env/Environment.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/env/Environment.java @@ -1,10 +1,14 @@ package com.dke.data.agrirouter.api.env; import com.dke.data.agrirouter.api.enums.SecuredOnboardingResponseType; +import org.apache.commons.lang3.BooleanUtils; /** Common Environment, holds some default methods pointing to the QA. */ public interface Environment { + /** System property to enable logging for HTTP requests. */ + String ENABLE_HTTP_REQUEST_LOGGING = "agrirouter.sdk.enable.http.request.logging"; + /** Template for MQTT connections. */ String MQTT_URL_TEMPLATE = "ssl://%s:%s"; @@ -12,6 +16,16 @@ public interface Environment { String SECURED_ONBOARDING_AUTHORIZATION_LINK_TEMPLATE = "/application/%s/authorize?response_type=%s&state=%s&redirect_uri=%s"; + /** + * Checks whether the HTTP request logging is enabled or not. + * + * @return - + */ + static boolean httpRequestLoggingEnabled() { + final String httpRequestLoggingEnabled = System.getProperty(ENABLE_HTTP_REQUEST_LOGGING); + return BooleanUtils.toBoolean(httpRequestLoggingEnabled); + } + /** * Returning the AR base URL without trailing '/'. * diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/RequestLogging.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/RequestLogging.java deleted file mode 100644 index fb9ec9d2..00000000 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/RequestLogging.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.dke.data.agrirouter.api.service; - -import javax.ws.rs.core.Response; - -public interface RequestLogging extends HasLogger { - - default void logRequest(String className, Response response) { - String logMessage = "\n" + "# [" + className + "] " + "\n" + response + "\n"; - this.getNativeLogger().info(logMessage); - } -} diff --git a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/encoding/EncodeMessageService.java b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/encoding/EncodeMessageService.java index b3c44f34..90dcb350 100644 --- a/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/encoding/EncodeMessageService.java +++ b/agrirouter-sdk-java-api/src/main/java/com/dke/data/agrirouter/api/service/messaging/encoding/EncodeMessageService.java @@ -35,7 +35,7 @@ String encode( * @param payloadParameters - * @return - */ - List chunkAndEncode( + List chunkAndBase64EncodeEachChunk( MessageHeaderParameters messageHeaderParameters, PayloadParameters payloadParameters, OnboardingResponse onboardingResponse); diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/RequestFactory.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/RequestFactory.java index 53d5a95d..ffba1af0 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/RequestFactory.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/RequestFactory.java @@ -1,6 +1,7 @@ package com.dke.data.agrirouter.impl; import com.dke.data.agrirouter.api.enums.CertificationType; +import com.dke.data.agrirouter.api.env.Environment; import com.dke.data.agrirouter.api.exception.CertificationTypeNotSupportedException; import com.dke.data.agrirouter.api.exception.CouldNotCreateDynamicKeyStoreException; import com.dke.data.agrirouter.impl.common.ssl.KeyStoreCreationService; @@ -14,10 +15,14 @@ import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.logging.LoggingFeature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Factory to encapsulate the requests against the agrirouter */ public final class RequestFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(RequestFactory.class); + /** Hidden constructor. */ private RequestFactory() { // NOP @@ -36,7 +41,13 @@ public static Invocation.Builder securedRequest( ClientConfig clientConfig = new ClientConfig(); KeyStore keyStore = createKeyStore(certificate, password, certificationType); Client client = createClient(clientConfig, keyStore, password, certificationType); - client.property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_CLIENT, "INFO"); + if (Environment.httpRequestLoggingEnabled()) { + client.property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_CLIENT, "INFO"); + } else { + LOGGER.debug( + "Request logging is currently disabled. If you want to enable it, please set '{}'.", + Environment.ENABLE_HTTP_REQUEST_LOGGING); + } WebTarget target = client.target(url); Invocation.Builder request = target.request(MediaType.APPLICATION_JSON_TYPE); request.accept(MediaType.APPLICATION_JSON_TYPE); @@ -145,7 +156,7 @@ public static Invocation.Builder bearerTokenRequest( return request; } - public class AgrirouterHttpHeader { + public static class AgrirouterHttpHeader { public static final String APPLICATION_ID = "X-Agrirouter-ApplicationId"; public static final String SIGNATURE = "X-Agrirouter-Signature"; } diff --git a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java index 14d50b9e..b8cb3370 100644 --- a/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java +++ b/agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImpl.java @@ -1,5 +1,7 @@ package com.dke.data.agrirouter.impl.messaging.encoding; +import static com.dke.data.agrirouter.api.service.parameters.PayloadParametersKt.MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT; + import agrirouter.commons.Chunk; import agrirouter.request.Request; import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; @@ -13,16 +15,12 @@ import com.dke.data.agrirouter.impl.NonEnvironmentalService; import com.dke.data.agrirouter.impl.common.MessageIdService; import com.dke.data.agrirouter.impl.messaging.SequenceNumberService; -import com.google.common.base.Splitter; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Base64; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -92,7 +90,7 @@ public List encode(List messageParameterTuples) { * @param payloadParameters Content of the message. It shall not be Base64 encoded before. * @return - */ - public List chunkAndEncode( + public List chunkAndBase64EncodeEachChunk( MessageHeaderParameters messageHeaderParameters, PayloadParameters payloadParameters, OnboardingResponse onboardingResponse) { @@ -114,10 +112,8 @@ public List chunkAndEncode( .debug( "The message should be chunked, current size of the payload ({}) is above the limitation.", payloadParameters.getValue().toStringUtf8().length()); - String wholeMessage = payloadParameters.getValue().toStringUtf8(); - final List messageChunks = - Splitter.fixedLength(payloadParameters.maxLengthForMessages()) - .splitToList(wholeMessage); + byte[] wholeMessage = payloadParameters.getValue().toByteArray(); + final List messageChunks = splitIntoChunks(wholeMessage); List tuples = new ArrayList<>(); AtomicInteger chunkNr = new AtomicInteger(1); final String chunkContextId = ChunkContextIdService.generateChunkContextId(); @@ -135,14 +131,12 @@ public List chunkAndEncode( chunkInfo.setContextId(chunkContextId); chunkInfo.setCurrent(chunkNr.get()); chunkInfo.setTotal(messageChunks.size()); - chunkInfo.setTotalSize(wholeMessage.length()); + chunkInfo.setTotalSize(wholeMessage.length); header.setChunkInfo(chunkInfo.build()); final PayloadParameters payload = new PayloadParameters(); payload.copyFrom(payloadParameters); - payload.setValue( - ByteString.copyFromUtf8( - Base64.getEncoder().encodeToString(chunk.getBytes(StandardCharsets.UTF_8)))); + payload.setValue(ByteString.copyFromUtf8(Base64.getEncoder().encodeToString(chunk))); tuples.add(new MessageParameterTuple(header, payload)); @@ -179,6 +173,23 @@ public List chunkAndEncode( } } + private List splitIntoChunks(byte[] wholeMessage) { + List chunks = new ArrayList<>(); + byte[] remainingBytes = wholeMessage; + do { + final byte[] chunk = + Arrays.copyOfRange(remainingBytes, 0, MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT); + chunks.add(chunk); + remainingBytes = + Arrays.copyOfRange( + remainingBytes, MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT + 1, remainingBytes.length - 1); + } while (remainingBytes.length > MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT); + if (remainingBytes.length > 0) { + chunks.add(remainingBytes); + } + return chunks; + } + private Request.RequestEnvelope header(MessageHeaderParameters parameters) { logMethodBegin(parameters); diff --git a/agrirouter-sdk-java-impl/src/test/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImplTest.java b/agrirouter-sdk-java-impl/src/test/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImplTest.java index 3840a6fc..8adb2459 100644 --- a/agrirouter-sdk-java-impl/src/test/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImplTest.java +++ b/agrirouter-sdk-java-impl/src/test/java/com/dke/data/agrirouter/impl/messaging/encoding/EncodeMessageServiceImplTest.java @@ -31,7 +31,7 @@ void givenEmptyMessageWhenChunkingThenTheImplementationShouldReturnTheRightNumbe PayloadParameters payloadParameters = getPayloadParameters(toSendMessage); final List chunks = - encodeMessageService.chunkAndEncode( + encodeMessageService.chunkAndBase64EncodeEachChunk( messageHeaderParameters, payloadParameters, fakeOnboardingResponse()); Assertions.assertEquals(1, chunks.size()); } @@ -47,7 +47,7 @@ void givenEmptyMessageWhenChunkingThenTheImplementationShouldReturnTheRightNumbe PayloadParameters payloadParameters = getPayloadParameters(toSendMessage); final List chunks = - encodeMessageService.chunkAndEncode( + encodeMessageService.chunkAndBase64EncodeEachChunk( messageHeaderParameters, payloadParameters, fakeOnboardingResponse()); Assertions.assertEquals(1, chunks.size()); } @@ -66,7 +66,7 @@ void givenEmptyMessageWhenChunkingThenTheImplementationShouldReturnTheRightNumbe PayloadParameters payloadParameters = getPayloadParameters(toSendMessage); final List chunks = - encodeMessageService.chunkAndEncode( + encodeMessageService.chunkAndBase64EncodeEachChunk( messageHeaderParameters, payloadParameters, fakeOnboardingResponse()); Assertions.assertEquals(1, chunks.size()); } @@ -83,7 +83,7 @@ void givenEmptyMessageWhenChunkingThenTheImplementationShouldReturnTheRightNumbe PayloadParameters payloadParameters = getPayloadParameters(toSendMessage); final List chunks = - encodeMessageService.chunkAndEncode( + encodeMessageService.chunkAndBase64EncodeEachChunk( messageHeaderParameters, payloadParameters, fakeOnboardingResponse()); Assertions.assertEquals(2, chunks.size()); diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java index fa77daae..7feb0459 100644 --- a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java @@ -16,6 +16,11 @@ public static String readBase64EncodedMessageContent(Identifier identifier) thro return new String(Base64.getEncoder().encode(rawData)); } + public static byte[] readRawData(Identifier identifier) throws Throwable { + Path path = Paths.get(FOLDER.concat(identifier.getFileName())); + return Files.readAllBytes(path); + } + public enum Identifier { BIG_TASK_DATA("big_taskdata.zip"), SMALL_TASK_DATA("small_taskdata.zip"); diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/messaging/rest/SendChunkedMessageTest.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/messaging/rest/SendChunkedMessageTest.java index 16234127..78d10969 100644 --- a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/messaging/rest/SendChunkedMessageTest.java +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/messaging/rest/SendChunkedMessageTest.java @@ -20,28 +20,22 @@ import com.dke.data.agrirouter.test.AbstractIntegrationTest; import com.dke.data.agrirouter.test.Assertions; import com.dke.data.agrirouter.test.OnboardingResponseRepository; +import com.dke.data.agrirouter.test.helper.ContentReader; import com.google.protobuf.ByteString; -import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.HttpStatus; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.Test; /** Test case to show the behavior for chunked message sending. */ class SendChunkedMessageTest extends AbstractIntegrationTest { private static final int MAX_CHUNK_SIZE = 1024000; - @ParameterizedTest - @MethodSource("fakeRawMessageContentThatHasToBeChunked") + @Test void - givenLargeContentMessageWhenSendingTheMessageToTheAgrirouterTheSdkShouldHelpToSendTheFileInMultipleChunks( - ByteString messageContent, int expectedNrOfChunks) - throws IOException, InterruptedException { + givenLargeContentMessageWhenSendingTheMessageToTheAgrirouterTheSdkShouldHelpToSendTheFileInMultipleChunks() + throws Throwable { final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); final SendMessageServiceImpl sendMessageService = new SendMessageServiceImpl(); @@ -55,17 +49,15 @@ class SendChunkedMessageTest extends AbstractIntegrationTest { SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse)); messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT); messageHeaderParameters.setRecipients( - Collections.singletonList( - OnboardingResponseRepository.read( - OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT) - .getSensorAlternateId())); + Collections.singletonList("37cd61d1-76eb-4145-a735-c938d05a32d8")); PayloadParameters payloadParameters = new PayloadParameters(); - payloadParameters.setValue(messageContent); + payloadParameters.setValue( + ByteString.copyFrom(ContentReader.readRawData(ContentReader.Identifier.BIG_TASK_DATA))); payloadParameters.setTypeUrl(SystemMessageType.EMPTY.getKey()); List tuples = - encodeMessageService.chunkAndEncode( + encodeMessageService.chunkAndBase64EncodeEachChunk( messageHeaderParameters, payloadParameters, onboardingResponse); tuples.forEach( @@ -92,7 +84,7 @@ class SendChunkedMessageTest extends AbstractIntegrationTest { new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); + Assertions.assertEquals(3, fetchMessageResponses.get().size()); DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); AtomicReference decodeMessageResponse = new AtomicReference<>(); @@ -108,28 +100,4 @@ class SendChunkedMessageTest extends AbstractIntegrationTest { decodeMessageResponse.get().getResponseEnvelope().getResponseCode()); }); } - - /** - * Delivers fake message content for multiple test cases. - * - * @return - - */ - private static Stream fakeRawMessageContentThatHasToBeChunked() { - return Stream.of( - Arguments.of( - ByteString.copyFromUtf8( - RandomStringUtils.randomAlphabetic( - PayloadParametersKt.MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT * 3)), - 3), - Arguments.of( - ByteString.copyFromUtf8( - RandomStringUtils.randomAlphabetic( - PayloadParametersKt.MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT * 2)), - 2), - Arguments.of( - ByteString.copyFromUtf8( - RandomStringUtils.randomAlphabetic( - PayloadParametersKt.MAX_LENGTH_FOR_RAW_MESSAGE_CONTENT)), - 1)); - } } diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java deleted file mode 100644 index 4443a31d..00000000 --- a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java +++ /dev/null @@ -1,323 +0,0 @@ -package com.dke.data.agrirouter.test.usecase; - -import agrirouter.feed.response.FeedResponse; -import agrirouter.request.Request; -import agrirouter.response.Response; -import com.dke.data.agrirouter.api.cancellation.DefaultCancellationToken; -import com.dke.data.agrirouter.api.dto.encoding.DecodeMessageResponse; -import com.dke.data.agrirouter.api.dto.messaging.FetchMessageResponse; -import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; -import com.dke.data.agrirouter.api.enums.ContentMessageType; -import com.dke.data.agrirouter.api.enums.SystemMessageType; -import com.dke.data.agrirouter.api.env.QA; -import com.dke.data.agrirouter.api.service.messaging.encoding.DecodeMessageService; -import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; -import com.dke.data.agrirouter.api.service.messaging.http.FetchMessageService; -import com.dke.data.agrirouter.api.service.parameters.*; -import com.dke.data.agrirouter.impl.common.MessageIdService; -import com.dke.data.agrirouter.impl.common.UtcTimeService; -import com.dke.data.agrirouter.impl.messaging.SequenceNumberService; -import com.dke.data.agrirouter.impl.messaging.encoding.DecodeMessageServiceImpl; -import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; -import com.dke.data.agrirouter.impl.messaging.rest.*; -import com.dke.data.agrirouter.test.AbstractIntegrationTest; -import com.dke.data.agrirouter.test.Assertions; -import com.dke.data.agrirouter.test.OnboardingResponseRepository; -import com.dke.data.agrirouter.test.helper.ContentReader; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.http.HttpStatus; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -/** Test case to show the behavior for chunked message sending. */ -class SendAndReceiveChunkedMessagesTest extends AbstractIntegrationTest { - - private static final int MAX_CHUNK_SIZE = 1024000; - - @ParameterizedTest - @MethodSource("getTestArguments") - void givenRealMessageContentWhenSendingMessagesTheContentShouldMatchAfterReceivingAndMergingIt( - ByteString messageContent, int expectedNrOfChunks) throws Throwable { - actionsForSender(messageContent, expectedNrOfChunks); - actionsForTheRecipient(messageContent, expectedNrOfChunks); - } - - /** - * These are the actions for the recipient. The recipient is already set up and declared the - * capabilities. The actions for the recipient are documented inline. - * - * @param messageContent - - * @param expectedNrOfChunks - - */ - private void actionsForTheRecipient(ByteString messageContent, int expectedNrOfChunks) - throws Throwable { - // [1] Fetch all the messages within the feed. The number of headers should match the number of - // chunks sent. - final MessageQueryServiceImpl messageQueryService = new MessageQueryServiceImpl(new QA() {}); - final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); - final OnboardingResponse recipient = - OnboardingResponseRepository.read( - OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); - messageQueryParameters.setOnboardingResponse(recipient); - messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); - messageQueryParameters.setSentFromInSeconds( - UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); - messageQueryService.send(messageQueryParameters); - - // [2] Wait for the agrirouter to process the message. - waitForTheAgrirouterToProcessSingleMessage(); - - // [3] Fetch the chunks from the outbox. Since we have the same restrictions while receiving, - // this has to be the same number of messages as it is chunks. - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - Optional> fetchMessageResponses = - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); - - // [4] Check if the response from the AR only contains valid results. - final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - fetchMessageResponses.get().stream() - .map( - fetchMessageResponse -> - decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) - .forEach( - decodeMessageResponse -> - Assertions.assertEquals( - Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_MESSAGE, - decodeMessageResponse.getResponseEnvelope().getType())); - - // [5] Map the results from the query to 'real' messages within the feed and perform some - // assertions. - final List feedMessages = - fetchMessageResponses.get().stream() - .map( - fetchMessageResponse -> - decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) - .map( - decodeMessageResponse -> - messageQueryService.decode( - decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue())) - .map(messageQueryResponse -> messageQueryResponse.getMessages(0)) - .collect(Collectors.toList()); - Assertions.assertEquals(expectedNrOfChunks, feedMessages.size()); - feedMessages.forEach( - feedMessage -> Assertions.assertNotNull(feedMessage.getHeader().getChunkContext())); - Assertions.assertEquals( - feedMessages.get(0).getHeader().getChunkContext().getTotal(), expectedNrOfChunks); - final Set chunkContextIds = - feedMessages.stream() - .map(feedMessage -> feedMessage.getHeader().getChunkContext().getContextId()) - .collect(Collectors.toSet()); - Assertions.assertEquals( - 1, chunkContextIds.size(), "There should be only one chunk context ID."); - - // [6] Combine the chunks in order of their numbers and check if the result is fine. - feedMessages.sort(Comparator.comparingLong(o -> o.getHeader().getChunkContext().getCurrent())); - final String base64EncodedMessageContent = - feedMessages.stream() - // Fetch the message content for each chunk. - .map(feedMessage -> feedMessage.getContent().getValue().toStringUtf8()) - // Decode each of the chunks to get concat the original value. - .map( - base64EncodedContent -> - new String(Base64.getDecoder().decode(base64EncodedContent))) - // Concat the chunks. - .collect(Collectors.joining("")); - Assertions.assertEquals(messageContent.toStringUtf8(), base64EncodedMessageContent); - - // [7] Confirm the chunks to remove them from the feed. - final List messageIdsToConfirm = - feedMessages.stream() - .map(feedMessage -> feedMessage.getHeader().getMessageId()) - .collect(Collectors.toList()); - final MessageConfirmationServiceImpl messageConfirmationService = - new MessageConfirmationServiceImpl(new QA() {}); - final MessageConfirmationParameters messageConfirmationParameters = - new MessageConfirmationParameters(); - messageConfirmationParameters.setOnboardingResponse(recipient); - messageConfirmationParameters.setMessageIds(messageIdsToConfirm); - messageConfirmationService.send(messageConfirmationParameters); - - // [8] Fetch the response from the agrirouter after confirming the messages. - waitForTheAgrirouterToProcessSingleMessage(); - fetchMessageResponses = - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals( - 1, fetchMessageResponses.get().size(), "This should be a single response."); - } - - /** - * These are the actions for the sender. The sender is already set up and declared the - * capabilities. The actions for the sender are documented inline. - * - * @param messageContent - - * @param expectedNrOfChunks - - * @throws IOException - - * @throws InterruptedException - - */ - private void actionsForSender(ByteString messageContent, int expectedNrOfChunks) - throws IOException, InterruptedException { - final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); - final SendMessageServiceImpl sendMessageService = new SendMessageServiceImpl(); - final OnboardingResponse onboardingResponse = - OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.FARMING_SOFTWARE); - - // [1] Define the raw message, in this case this is the Base64 encoded message content, no - // chunking needed. - MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters(); - messageHeaderParameters.setTechnicalMessageType(ContentMessageType.ISO_11783_TASKDATA_ZIP); - messageHeaderParameters.setApplicationMessageId(MessageIdService.generateMessageId()); - messageHeaderParameters.setApplicationMessageSeqNo( - SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse)); - messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT); - messageHeaderParameters.setRecipients( - Collections.singletonList( - OnboardingResponseRepository.read( - OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT) - .getSensorAlternateId())); - - PayloadParameters payloadParameters = new PayloadParameters(); - payloadParameters.setValue(messageContent); - payloadParameters.setTypeUrl(SystemMessageType.EMPTY.getKey()); - - // [2] Chunk the message content using the SDK specific methods ('chunkAndEncode'). - List tuples = - encodeMessageService.chunkAndEncode( - messageHeaderParameters, payloadParameters, onboardingResponse); - - tuples.forEach( - messageParameterTuple -> - Assertions.assertTrue( - Objects.requireNonNull(messageParameterTuple.getPayloadParameters().getValue()) - .toStringUtf8() - .length() - <= MAX_CHUNK_SIZE)); - - List encodedMessages = encodeMessageService.encode(tuples); - - // [3] Send the chunks to the agrirouter. - SendMessageParameters sendMessageParameters = new SendMessageParameters(); - sendMessageParameters.setEncodedMessages(encodedMessages); - sendMessageParameters.setOnboardingResponse(onboardingResponse); - sendMessageService.send(sendMessageParameters); - - // [4] Wait for the AR to process the chunks. - waitForTheAgrirouterToProcessMultipleMessages(); - - // [5] Check if the chunks were processed successfully. - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - Optional> fetchMessageResponses = - fetchMessageService.fetch( - onboardingResponse, - new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); - - DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - AtomicReference decodeMessageResponse = new AtomicReference<>(); - fetchMessageResponses.get().stream() - .map(FetchMessageResponse::getCommand) - .forEach( - message -> { - Assertions.assertNotNull(message); - decodeMessageResponse.set(decodeMessageService.decode(message.getMessage())); - - Assertions.assertMatchesAny( - Arrays.asList(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT), - decodeMessageResponse.get().getResponseEnvelope().getResponseCode()); - }); - } - - /** - * Delivers fake message content for multiple test cases. - * - * @return - - */ - private static Stream getTestArguments() throws Throwable { - return Stream.of( - Arguments.of( - ByteString.copyFromUtf8( - ContentReader.readBase64EncodedMessageContent( - ContentReader.Identifier.BIG_TASK_DATA)), - 4)); - } - - /** - * Cleanup before and after each test case. These actions are necessary because it could be the - * case, that there are dangling messages from former tests. - */ - @BeforeEach - @AfterEach - public void prepareTestEnvironment() throws Throwable { - FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); - final OnboardingResponse recipient = - OnboardingResponseRepository.read( - OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); - final MessageHeaderQueryServiceImpl messageHeaderQueryService = - new MessageHeaderQueryServiceImpl(new QA() {}); - - // [1] Clean the outbox of the endpoint. - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - - // [2] Fetch all message headers for the last 4 weeks (maximum retention time within the - // agrirouter). - final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); - messageQueryParameters.setOnboardingResponse(recipient); - messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); - messageQueryParameters.setSentFromInSeconds( - UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); - messageHeaderQueryService.send(messageQueryParameters); - waitForTheAgrirouterToProcessSingleMessage(); - Optional> fetchMessageResponses = - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - Assertions.assertTrue(fetchMessageResponses.isPresent()); - Assertions.assertEquals( - 1, fetchMessageResponses.get().size(), "This should be a single response."); - final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); - final DecodeMessageResponse decodeMessageResponse = - decodeMessageService.decode(fetchMessageResponses.get().get(0).getCommand().getMessage()); - Assertions.assertEquals( - Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_HEADER_LIST, - decodeMessageResponse.getResponseEnvelope().getType()); - final FeedResponse.HeaderQueryResponse headerQueryResponse = - messageHeaderQueryService.decode( - decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue()); - - // [3] Delete the dangling messages from the feed of the endpoint if necessary. - if (headerQueryResponse.getQueryMetrics().getTotalMessagesInQuery() > 0) { - final DeleteMessageServiceImpl deleteMessageService = new DeleteMessageServiceImpl(); - final DeleteMessageParameters deleteMessageParameters = new DeleteMessageParameters(); - deleteMessageParameters.setOnboardingResponse(recipient); - final List messageIds = - headerQueryResponse.getFeedList().stream() - .map(FeedResponse.HeaderQueryResponse.Feed::getHeadersList) - .flatMap(Collection::stream) - .map(FeedResponse.HeaderQueryResponse.Header::getMessageId) - .collect(Collectors.toList()); - deleteMessageParameters.setMessageIds(messageIds); - deleteMessageService.send(deleteMessageParameters); - waitForTheAgrirouterToProcessSingleMessage(); - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - } - - // [4] Clean the outbox of the endpoint. - fetchMessageService.fetch( - recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); - } -}