diff --git a/agrirouter-sdk-java-tests/message-content/big_bmp.bmp b/agrirouter-sdk-java-tests/message-content/big_bmp.bmp new file mode 100644 index 00000000..b9c290b1 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_bmp.bmp differ diff --git a/agrirouter-sdk-java-tests/message-content/big_jpg.jpg b/agrirouter-sdk-java-tests/message-content/big_jpg.jpg new file mode 100644 index 00000000..1ebed96b Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_jpg.jpg differ diff --git a/agrirouter-sdk-java-tests/message-content/big_pdf.pdf b/agrirouter-sdk-java-tests/message-content/big_pdf.pdf new file mode 100644 index 00000000..b21d3908 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_pdf.pdf differ diff --git a/agrirouter-sdk-java-tests/message-content/big_png.png b/agrirouter-sdk-java-tests/message-content/big_png.png new file mode 100644 index 00000000..343e7404 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_png.png differ diff --git a/agrirouter-sdk-java-tests/message-content/big_shape.zip b/agrirouter-sdk-java-tests/message-content/big_shape.zip new file mode 100644 index 00000000..d4245b4a Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_shape.zip differ diff --git a/agrirouter-sdk-java-tests/message-content/big_taskdata.zip b/agrirouter-sdk-java-tests/message-content/big_taskdata.zip new file mode 100644 index 00000000..b5104dfc Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/big_taskdata.zip differ diff --git a/agrirouter-sdk-java-tests/message-content/content.zip b/agrirouter-sdk-java-tests/message-content/content.zip new file mode 100644 index 00000000..9a5509ea Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/content.zip differ diff --git a/agrirouter-sdk-java-tests/message-content/small_bmp.bmp b/agrirouter-sdk-java-tests/message-content/small_bmp.bmp new file mode 100644 index 00000000..86a15fe3 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_bmp.bmp differ diff --git a/agrirouter-sdk-java-tests/message-content/small_jpg.jpg b/agrirouter-sdk-java-tests/message-content/small_jpg.jpg new file mode 100644 index 00000000..d5cd5ac6 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_jpg.jpg differ diff --git a/agrirouter-sdk-java-tests/message-content/small_pdf.pdf b/agrirouter-sdk-java-tests/message-content/small_pdf.pdf new file mode 100644 index 00000000..9ea156c0 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_pdf.pdf differ diff --git a/agrirouter-sdk-java-tests/message-content/small_png.png b/agrirouter-sdk-java-tests/message-content/small_png.png new file mode 100644 index 00000000..d1c9baf3 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_png.png differ diff --git a/agrirouter-sdk-java-tests/message-content/small_shape.zip b/agrirouter-sdk-java-tests/message-content/small_shape.zip new file mode 100644 index 00000000..0b49bfbd Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_shape.zip differ diff --git a/agrirouter-sdk-java-tests/message-content/small_taskdata.zip b/agrirouter-sdk-java-tests/message-content/small_taskdata.zip new file mode 100644 index 00000000..f3d9da09 Binary files /dev/null and b/agrirouter-sdk-java-tests/message-content/small_taskdata.zip differ diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java new file mode 100644 index 00000000..1a45554b --- /dev/null +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/helper/ContentReader.java @@ -0,0 +1,36 @@ +package com.dke.data.agrirouter.test.helper; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Base64; + +/** + * Generic content reader for the testcases. + */ +public class ContentReader { + + private static final String FOLDER = "./message-content/"; + + public static String readBase64EncodedMessageContent(Identifier identifier) throws Throwable { + Path path = Paths.get(FOLDER.concat(identifier.getFileName())); + final byte[] rawData = Files.readAllBytes(path); + return new String(Base64.getEncoder().encode(rawData)); + } + + public enum Identifier { + BIG_TASK_DATA("big_taskdata.zip"), + SMALL_TASK_DATA("small_taskdata.zip"); + + private final String fileName; + + Identifier(String fileName) { + this.fileName = fileName; + } + + public String getFileName() { + return fileName; + } + } + +} diff --git a/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java new file mode 100644 index 00000000..517b2cea --- /dev/null +++ b/agrirouter-sdk-java-tests/src/test/java/com/dke/data/agrirouter/test/usecase/SendAndReceiveChunkedMessagesTest.java @@ -0,0 +1,279 @@ +package com.dke.data.agrirouter.test.usecase; + +import agrirouter.feed.response.FeedResponse; +import agrirouter.request.Request; +import agrirouter.response.Response; +import com.dke.data.agrirouter.api.cancellation.DefaultCancellationToken; +import com.dke.data.agrirouter.api.dto.encoding.DecodeMessageResponse; +import com.dke.data.agrirouter.api.dto.messaging.FetchMessageResponse; +import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse; +import com.dke.data.agrirouter.api.enums.ContentMessageType; +import com.dke.data.agrirouter.api.enums.SystemMessageType; +import com.dke.data.agrirouter.api.env.QA; +import com.dke.data.agrirouter.api.service.messaging.encoding.DecodeMessageService; +import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService; +import com.dke.data.agrirouter.api.service.messaging.http.FetchMessageService; +import com.dke.data.agrirouter.api.service.parameters.*; +import com.dke.data.agrirouter.impl.common.MessageIdService; +import com.dke.data.agrirouter.impl.common.UtcTimeService; +import com.dke.data.agrirouter.impl.messaging.SequenceNumberService; +import com.dke.data.agrirouter.impl.messaging.encoding.DecodeMessageServiceImpl; +import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl; +import com.dke.data.agrirouter.impl.messaging.rest.*; +import com.dke.data.agrirouter.test.AbstractIntegrationTest; +import com.dke.data.agrirouter.test.Assertions; +import com.dke.data.agrirouter.test.OnboardingResponseRepository; +import com.dke.data.agrirouter.test.helper.ContentReader; +import com.google.protobuf.ByteString; +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Test case to show the behavior for chunked message sending. + */ +class SendAndReceiveChunkedMessagesTest extends AbstractIntegrationTest { + + private static final int MAX_CHUNK_SIZE = 1024000; + + @ParameterizedTest + @MethodSource("getTestArguments") + void givenRealMessageContentWhenSendingMessagesTheContentShouldMatchAfterReceivingAndMergingIt( + ByteString messageContent, int expectedNrOfChunks) + throws Throwable { + actionsForSender(messageContent, expectedNrOfChunks); + actionsForTheRecipient(messageContent, expectedNrOfChunks); + } + + /** + * These are the actions for the recipient. The recipient is already set up and declared the capabilities. The actions for the recipient are documented inline. + * + * @param messageContent - + * @param expectedNrOfChunks - + */ + private void actionsForTheRecipient(ByteString messageContent, int expectedNrOfChunks) throws Throwable { + // [1] Fetch all the messages within the feed. The number of headers should match the number of chunks sent. + final MessageQueryServiceImpl messageQueryService = new MessageQueryServiceImpl(new QA() { + }); + final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); + final OnboardingResponse recipient = OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); + messageQueryParameters.setOnboardingResponse(recipient); + messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); + messageQueryParameters.setSentFromInSeconds(UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); + messageQueryService.send(messageQueryParameters); + + // [2] Wait for the agrirouter to process the message. + waitForTheAgrirouterToProcessSingleMessage(); + + // [3] Fetch the chunks from the outbox. Since we have the same restrictions while receiving, this has to be the same number of messages as it is chunks. + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + Optional> fetchMessageResponses = + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); + + // [4] Check if the response from the AR only contains valid results. + final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + fetchMessageResponses.get() + .stream() + .map(fetchMessageResponse -> decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) + .forEach(decodeMessageResponse -> Assertions.assertEquals(Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_MESSAGE, decodeMessageResponse.getResponseEnvelope().getType())); + + // [5] Map the results from the query to 'real' messages within the feed and perform some assertions. + final List feedMessages = fetchMessageResponses.get() + .stream() + .map(fetchMessageResponse -> decodeMessageService.decode(fetchMessageResponse.getCommand().getMessage())) + .map(decodeMessageResponse -> messageQueryService.decode(decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue())) + .map(messageQueryResponse -> messageQueryResponse.getMessages(0)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedNrOfChunks, feedMessages.size()); + feedMessages.forEach(feedMessage -> Assertions.assertNotNull(feedMessage.getHeader().getChunkContext())); + Assertions.assertEquals(feedMessages.get(0).getHeader().getChunkContext().getTotal(), expectedNrOfChunks); + final Set chunkContextIds = feedMessages.stream().map(feedMessage -> feedMessage.getHeader().getChunkContext().getContextId()).collect(Collectors.toSet()); + Assertions.assertEquals(1, chunkContextIds.size(), "There should be only one chunk context ID."); + + // [6] Combine the chunks in order of their numbers and check if the result is fine. + feedMessages.sort(Comparator.comparingLong(o -> o.getHeader().getChunkContext().getCurrent())); + final String base64EncodedMessageContent = feedMessages + .stream() + // Fetch the message content for each chunk. + .map(feedMessage -> feedMessage.getContent().getValue().toStringUtf8()) + // Decode each of the chunks to get concat the original value. + .map(base64EncodedContent -> new String(Base64.getDecoder().decode(base64EncodedContent))) + // Concat the chunks. + .collect(Collectors.joining("")); + Assertions.assertEquals(messageContent.toStringUtf8(), base64EncodedMessageContent); + + // [7] Confirm the chunks to remove them from the feed. + final List messageIdsToConfirm = feedMessages + .stream() + .map(feedMessage -> feedMessage.getHeader().getMessageId()) + .collect(Collectors.toList()); + final MessageConfirmationServiceImpl messageConfirmationService = new MessageConfirmationServiceImpl(new QA() { + }); + final MessageConfirmationParameters messageConfirmationParameters = new MessageConfirmationParameters(); + messageConfirmationParameters.setOnboardingResponse(recipient); + messageConfirmationParameters.setMessageIds(messageIdsToConfirm); + messageConfirmationService.send(messageConfirmationParameters); + + // [8] Fetch the response from the agrirouter after confirming the messages. + waitForTheAgrirouterToProcessSingleMessage(); + fetchMessageResponses = + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(1, fetchMessageResponses.get().size(), "This should be a single response."); + } + + /** + * These are the actions for the sender. The sender is already set up and declared the capabilities. The actions for the sender are documented inline. + * + * @param messageContent - + * @param expectedNrOfChunks - + * @throws IOException - + * @throws InterruptedException - + */ + private void actionsForSender(ByteString messageContent, int expectedNrOfChunks) throws IOException, InterruptedException { + final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl(); + final SendMessageServiceImpl sendMessageService = new SendMessageServiceImpl(); + final OnboardingResponse onboardingResponse = + OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.FARMING_SOFTWARE); + + // [1] Define the raw message, in this case this is the Base64 encoded message content, no chunking needed. + MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters(); + messageHeaderParameters.setTechnicalMessageType(ContentMessageType.ISO_11783_TASKDATA_ZIP); + messageHeaderParameters.setApplicationMessageId(MessageIdService.generateMessageId()); + messageHeaderParameters.setApplicationMessageSeqNo( + SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse)); + messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT); + messageHeaderParameters.setRecipients( + Collections.singletonList( + OnboardingResponseRepository.read( + OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT) + .getSensorAlternateId())); + + PayloadParameters payloadParameters = new PayloadParameters(); + payloadParameters.setValue(messageContent); + payloadParameters.setTypeUrl(SystemMessageType.EMPTY.getKey()); + + // [2] Chunk the message content using the SDK specific methods ('chunkAndEncode'). + List tuples = + encodeMessageService.chunkAndEncode( + messageHeaderParameters, payloadParameters, onboardingResponse); + + tuples.forEach( + messageParameterTuple -> + Assertions.assertTrue( + Objects.requireNonNull(messageParameterTuple.getPayloadParameters().getValue()) + .toStringUtf8() + .length() + <= MAX_CHUNK_SIZE)); + + List encodedMessages = encodeMessageService.encode(tuples); + + // [3] Send the chunks to the agrirouter. + SendMessageParameters sendMessageParameters = new SendMessageParameters(); + sendMessageParameters.setEncodedMessages(encodedMessages); + sendMessageParameters.setOnboardingResponse(onboardingResponse); + sendMessageService.send(sendMessageParameters); + + // [4] Wait for the AR to process the chunks. + waitForTheAgrirouterToProcessMultipleMessages(); + + // [5] Check if the chunks were processed successfully. + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + Optional> fetchMessageResponses = + fetchMessageService.fetch( + onboardingResponse, + new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(expectedNrOfChunks, fetchMessageResponses.get().size()); + + DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + AtomicReference decodeMessageResponse = new AtomicReference<>(); + fetchMessageResponses.get().stream() + .map(FetchMessageResponse::getCommand) + .forEach( + message -> { + Assertions.assertNotNull(message); + decodeMessageResponse.set(decodeMessageService.decode(message.getMessage())); + + Assertions.assertMatchesAny( + Arrays.asList(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT), + decodeMessageResponse.get().getResponseEnvelope().getResponseCode()); + }); + } + + /** + * Delivers fake message content for multiple test cases. + * + * @return - + */ + private static Stream getTestArguments() throws Throwable { + return Stream.of( + Arguments.of( + ByteString.copyFromUtf8(ContentReader.readBase64EncodedMessageContent(ContentReader.Identifier.BIG_TASK_DATA)), + 4)); + } + + /** + * Cleanup before and after each test case. These actions are necessary because it could be the case, that there are dangling messages from former tests. + */ + @BeforeEach + @AfterEach + public void prepareTestEnvironment() throws Throwable { + FetchMessageService fetchMessageService = new FetchMessageServiceImpl(); + final OnboardingResponse recipient = OnboardingResponseRepository.read(OnboardingResponseRepository.Identifier.COMMUNICATION_UNIT); + final MessageHeaderQueryServiceImpl messageHeaderQueryService = new MessageHeaderQueryServiceImpl(new QA() { + }); + + // [1] Clean the outbox of the endpoint. + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + + // [2] Fetch all message headers for the last 4 weeks (maximum retention time within the agrirouter). + final MessageQueryParameters messageQueryParameters = new MessageQueryParameters(); + messageQueryParameters.setOnboardingResponse(recipient); + messageQueryParameters.setSentToInSeconds(UtcTimeService.inTheFuture(5).toEpochSecond()); + messageQueryParameters.setSentFromInSeconds(UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond()); + messageHeaderQueryService.send(messageQueryParameters); + waitForTheAgrirouterToProcessSingleMessage(); + Optional> fetchMessageResponses = + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + Assertions.assertTrue(fetchMessageResponses.isPresent()); + Assertions.assertEquals(1, fetchMessageResponses.get().size(), "This should be a single response."); + final DecodeMessageService decodeMessageService = new DecodeMessageServiceImpl(); + final DecodeMessageResponse decodeMessageResponse = decodeMessageService.decode(fetchMessageResponses.get().get(0).getCommand().getMessage()); + Assertions.assertEquals(Response.ResponseEnvelope.ResponseBodyType.ACK_FOR_FEED_HEADER_LIST, decodeMessageResponse.getResponseEnvelope().getType()); + final FeedResponse.HeaderQueryResponse headerQueryResponse = messageHeaderQueryService.decode(decodeMessageResponse.getResponsePayloadWrapper().getDetails().getValue()); + + // [3] Delete the dangling messages from the feed of the endpoint if necessary. + if (headerQueryResponse.getQueryMetrics().getTotalMessagesInQuery() > 0) { + final DeleteMessageServiceImpl deleteMessageService = new DeleteMessageServiceImpl(); + final DeleteMessageParameters deleteMessageParameters = new DeleteMessageParameters(); + deleteMessageParameters.setOnboardingResponse(recipient); + final List messageIds = headerQueryResponse.getFeedList() + .stream() + .map(FeedResponse.HeaderQueryResponse.Feed::getHeadersList) + .flatMap(Collection::stream) + .map(FeedResponse.HeaderQueryResponse.Header::getMessageId) + .collect(Collectors.toList()); + deleteMessageParameters.setMessageIds(messageIds); + deleteMessageService.send(deleteMessageParameters); + waitForTheAgrirouterToProcessSingleMessage(); + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + } + + // [4] Clean the outbox of the endpoint. + fetchMessageService.fetch(recipient, new DefaultCancellationToken(MAX_TRIES_BEFORE_FAILURE, DEFAULT_INTERVAL)); + } + +}