Skip to content
This repository was archived by the owner on May 8, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
package com.dke.data.agrirouter.api.service.messaging.http;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder;
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;

public interface MessageHeaderQueryService
extends MessagingService<MessageQueryParameters>,
MessageDecoder<FeedResponse.HeaderQueryResponse> {}
MessageDecoder<FeedResponse.HeaderQueryResponse> {

/**
* Query all message headers as default function. The query will be based on a time period since
* message ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
String queryAll(OnboardingResponse onboardingResponse);

/**
* Query all message headers as async default function. The query will be based on a time period
* since message ID filtering or sender filtering can be achieved using the default message
* sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
package com.dke.data.agrirouter.api.service.messaging.http;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder;
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;

public interface MessageQueryService
extends MessagingService<MessageQueryParameters>,
MessageDecoder<FeedResponse.MessageQueryResponse> {}
MessageDecoder<FeedResponse.MessageQueryResponse> {

/**
* Query all messages as default function. The query will be based on a time period since message
* ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
String queryAll(OnboardingResponse onboardingResponse);

/**
* Query all messages as async default function. The query will be based on a time period since
* message ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
package com.dke.data.agrirouter.api.service.messaging.mqtt;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder;
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;

public interface MessageHeaderQueryService
extends MessagingService<MessageQueryParameters>,
MessageDecoder<FeedResponse.HeaderQueryResponse> {}
MessageDecoder<FeedResponse.HeaderQueryResponse> {

/**
* Query all message headers as default function. The query will be based on a time period since
* message ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
String queryAll(OnboardingResponse onboardingResponse);

/**
* Query all message headers as async default function. The query will be based on a time period
* since message ID filtering or sender filtering can be achieved using the default message
* sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
package com.dke.data.agrirouter.api.service.messaging.mqtt;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder;
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;

public interface MessageQueryService
extends MessagingService<MessageQueryParameters>,
MessageDecoder<FeedResponse.MessageQueryResponse> {}
MessageDecoder<FeedResponse.MessageQueryResponse> {

/**
* Query all messages as default function. The query will be based on a time period since message
* ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
String queryAll(OnboardingResponse onboardingResponse);

/**
* Query all messages as async default function. The query will be based on a time period since
* message ID filtering or sender filtering can be achieved using the default message sending.
*
* @param onboardingResponse The onboard response for the endpoint.
* @return The message ID.
*/
MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public List<MessageParameterTuple> chunkAndEncode(
messageHeaderParameters.validate();
payloadParameters.validate();

if (messageHeaderParameters.getTechnicalMessageType().needsBase64EncodingAndHasToBeChunkedIfNecessary()) {
if (messageHeaderParameters
.getTechnicalMessageType()
.needsBase64EncodingAndHasToBeChunkedIfNecessary()) {
if (payloadParameters.shouldBeChunked()) {
getNativeLogger()
.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.concurrent.CompletableFuture;

public class MessageQueryHelperService extends NonEnvironmentalService
implements MessageSender, MessageEncoder, ResponseValidator {
implements MessageSender, MessageEncoder, ResponseValidator, QueryAllMessagesParameterCreator {

private final EncodeMessageService encodeMessageService;
private final TechnicalMessageType technicalMessageType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.dke.data.agrirouter.impl.messaging.helper;

import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;
import com.dke.data.agrirouter.impl.common.UtcTimeService;
import java.util.Collections;
import org.jetbrains.annotations.NotNull;

/** Interface to abstract the message query creation. */
public interface QueryAllMessagesParameterCreator {

/**
* Create message parameters to query all messages.
*
* @param onboardingResponse -
* @return -
*/
@NotNull
default MessageQueryParameters createMessageParametersToQueryAll(
OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters = new MessageQueryParameters();
messageQueryParameters.setOnboardingResponse(onboardingResponse);
messageQueryParameters.setMessageIds(Collections.emptyList());
messageQueryParameters.setSenderIds(Collections.emptyList());
messageQueryParameters.setSentFromInSeconds(
UtcTimeService.inThePast(UtcTimeService.FOUR_WEEKS_AGO).toEpochSecond());
messageQueryParameters.setSentToInSeconds(UtcTimeService.now().toEpochSecond());
return messageQueryParameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters;
import com.dke.data.agrirouter.impl.messaging.MessageEncoder;
import com.dke.data.agrirouter.impl.messaging.MqttService;
import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator;
import com.dke.data.agrirouter.impl.messaging.rest.MessageSender;
import java.util.Collections;
import java.util.Objects;
Expand All @@ -16,7 +17,7 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MessageQueryHelperService extends MqttService
implements MessageSender, MessageEncoder {
implements MessageSender, MessageEncoder, QueryAllMessagesParameterCreator {

private final EncodeMessageService encodeMessageService;
private final TechnicalMessageType technicalMessageType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dke.data.agrirouter.impl.messaging.mqtt;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.enums.SystemMessageType;
import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.mqtt.MessageHeaderQueryService;
Expand Down Expand Up @@ -42,4 +43,15 @@ public FeedResponse.HeaderQueryResponse unsafeDecode(ByteString message)
throws InvalidProtocolBufferException {
return FeedResponse.HeaderQueryResponse.parseFrom(message);
}

@Override
public String queryAll(OnboardingResponse onboardingResponse) {
return send(messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse));
}

@Override
public MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) {
return sendAsync(
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dke.data.agrirouter.impl.messaging.mqtt;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.enums.SystemMessageType;
import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult;
import com.dke.data.agrirouter.api.service.messaging.encoding.MessageDecoder;
Expand Down Expand Up @@ -45,4 +46,18 @@ public FeedResponse.MessageQueryResponse unsafeDecode(ByteString message)
throws InvalidProtocolBufferException {
return FeedResponse.MessageQueryResponse.parseFrom(message);
}

@Override
public String queryAll(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return send(messageQueryParameters);
}

@Override
public MqttAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return sendAsync(messageQueryParameters);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dke.data.agrirouter.impl.messaging.rest;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.enums.SystemMessageType;
import com.dke.data.agrirouter.api.env.Environment;
import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult;
Expand Down Expand Up @@ -39,4 +40,18 @@ public FeedResponse.HeaderQueryResponse unsafeDecode(ByteString message)
throws InvalidProtocolBufferException {
return FeedResponse.HeaderQueryResponse.parseFrom(message);
}

@Override
public String queryAll(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return send(messageQueryParameters);
}

@Override
public HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return sendAsync(messageQueryParameters);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dke.data.agrirouter.impl.messaging.rest;

import agrirouter.feed.response.FeedResponse;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.enums.SystemMessageType;
import com.dke.data.agrirouter.api.env.Environment;
import com.dke.data.agrirouter.api.messaging.HttpAsyncMessageSendingResult;
Expand Down Expand Up @@ -42,4 +43,18 @@ public FeedResponse.MessageQueryResponse unsafeDecode(ByteString message)
throws InvalidProtocolBufferException {
return FeedResponse.MessageQueryResponse.parseFrom(message);
}

@Override
public String queryAll(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return send(messageQueryParameters);
}

@Override
public HttpAsyncMessageSendingResult queryAllAsync(OnboardingResponse onboardingResponse) {
MessageQueryParameters messageQueryParameters =
messageQueryHelperService.createMessageParametersToQueryAll(onboardingResponse);
return sendAsync(messageQueryParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,29 @@
import java.nio.file.Paths;
import java.util.Base64;

/**
* Generic content reader for the testcases.
*/
/** Generic content reader for the testcases. */
public class ContentReader {

private static final String FOLDER = "./message-content/";
private static final String FOLDER = "./message-content/";

public static String readBase64EncodedMessageContent(Identifier identifier) throws Throwable {
Path path = Paths.get(FOLDER.concat(identifier.getFileName()));
final byte[] rawData = Files.readAllBytes(path);
return new String(Base64.getEncoder().encode(rawData));
}

public enum Identifier {
BIG_TASK_DATA("big_taskdata.zip"),
SMALL_TASK_DATA("small_taskdata.zip");
public static String readBase64EncodedMessageContent(Identifier identifier) throws Throwable {
Path path = Paths.get(FOLDER.concat(identifier.getFileName()));
final byte[] rawData = Files.readAllBytes(path);
return new String(Base64.getEncoder().encode(rawData));
}

private final String fileName;
public enum Identifier {
BIG_TASK_DATA("big_taskdata.zip"),
SMALL_TASK_DATA("small_taskdata.zip");

Identifier(String fileName) {
this.fileName = fileName;
}
private final String fileName;

public String getFileName() {
return fileName;
}
Identifier(String fileName) {
this.fileName = fileName;
}

public String getFileName() {
return fileName;
}
}
}
Loading