Skip to content
This repository was archived by the owner on May 8, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ abstract class AbstractParameterBase : DynamicAttributesStorage() {

var teamsetContextId: String? = null

var sequenceNumber: Int = 1
var sequenceNumber: Int = 0

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import agrirouter.request.payload.endpoint.Capabilities;
import agrirouter.request.payload.endpoint.SubscriptionOuterClass;
import com.dke.data.agrirouter.api.dto.encoding.EncodedMessage;
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
import com.dke.data.agrirouter.api.enums.SystemMessageType;
import com.dke.data.agrirouter.api.enums.TechnicalMessageType;
import com.dke.data.agrirouter.api.service.HasLogger;
Expand All @@ -26,8 +27,6 @@ public interface MessageEncoder extends HasLogger {
* @return -
*/
default EncodedMessage encode(DeleteMessageParameters parameters) {
assert parameters.getOnboardingResponse() != null;

MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

final String applicationMessageID =
Expand All @@ -40,11 +39,10 @@ default EncodedMessage encode(DeleteMessageParameters parameters) {
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));

messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));
setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setMetadata(MessageOuterClass.Metadata.newBuilder().build());

messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_FEED_DELETE);
Expand Down Expand Up @@ -84,8 +82,6 @@ default EncodedMessage encode(DeleteMessageParameters parameters) {
* @return -
*/
default EncodedMessage encode(ListEndpointsParameters parameters) {
assert parameters.getOnboardingResponse() != null;

MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

final String applicationMessageID =
Expand All @@ -99,12 +95,10 @@ default EncodedMessage encode(ListEndpointsParameters parameters) {
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));

messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));

setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
if (parameters.getUnfilteredList()) {
messageHeaderParameters.setTechnicalMessageType(
SystemMessageType.DKE_LIST_ENDPOINTS_UNFILTERED);
Expand Down Expand Up @@ -135,8 +129,6 @@ default EncodedMessage encode(ListEndpointsParameters parameters) {
* @return -
*/
default EncodedMessage encode(MessageConfirmationParameters parameters) {
assert parameters.getOnboardingResponse() != null;

MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

final String applicationMessageID =
Expand All @@ -151,11 +143,10 @@ default EncodedMessage encode(MessageConfirmationParameters parameters) {
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));

messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));
setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_FEED_CONFIRM);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);

Expand All @@ -178,8 +169,6 @@ default EncodedMessage encode(MessageConfirmationParameters parameters) {
* @return -
*/
default EncodedMessage encode(SetCapabilitiesParameters parameters) {
assert parameters.getOnboardingResponse() != null;

MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

final String applicationMessageID =
Expand All @@ -194,11 +183,10 @@ default EncodedMessage encode(SetCapabilitiesParameters parameters) {
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));

messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));
setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_CAPABILITIES);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);

Expand Down Expand Up @@ -232,15 +220,40 @@ default EncodedMessage encode(SetCapabilitiesParameters parameters) {
return new EncodedMessage(applicationMessageID, encodedMessage);
}

static void setSequenceNumber(
Comment thread
oliverrahner marked this conversation as resolved.
MessageHeaderParameters messageHeaderParameters,
long sequenceNumber,
OnboardingResponse onboardingResponse) {
if (sequenceNumber == 0 && onboardingResponse == null) {
throw new IllegalArgumentException(
"Either sequence number or onboarding response must be set.");
}

messageHeaderParameters.setApplicationMessageSeqNo(
sequenceNumber != 0
? sequenceNumber
: SequenceNumberService.generateSequenceNumberForEndpoint(onboardingResponse));
}

/**
* Encode a message to set a subscription.
*
* @param parameters -
* @return -
*/
@Deprecated
default EncodedMessage encodeMessage(SetSubscriptionParameters parameters) {
assert parameters.getOnboardingResponse() != null;
return encode(parameters);
}

/**
* Encode a message to set a subscription.
*
* @param parameters -
* @return -
*/
default EncodedMessage encode(SetSubscriptionParameters parameters) {
assert parameters.getOnboardingResponse() != null;
MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

final String applicationMessageID =
Expand All @@ -255,11 +268,10 @@ default EncodedMessage encodeMessage(SetSubscriptionParameters parameters) {
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));

messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));
setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_SUBSCRIPTION);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);

Expand Down Expand Up @@ -296,10 +308,7 @@ default EncodedMessage encodeMessage(SetSubscriptionParameters parameters) {
*/
default EncodedMessage encode(
TechnicalMessageType technicalMessageType, MessageQueryParameters parameters) {
assert parameters.getOnboardingResponse() != null;

this.logMethodBegin(parameters);

this.getNativeLogger().trace("Build message header parameters.");
MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();

Expand All @@ -314,12 +323,11 @@ default EncodedMessage encode(
final String teamsetContextId =
parameters.getTeamsetContextId() == null ? "" : parameters.getTeamsetContextId();
messageHeaderParameters.setTeamSetContextId(Objects.requireNonNull(teamsetContextId));
messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));

setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setTechnicalMessageType(technicalMessageType);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);

Expand Down Expand Up @@ -362,8 +370,6 @@ default EncodedMessage encode(
* @return -
*/
default EncodedMessage encode(CloudOnboardingParameters parameters) {
assert parameters.getOnboardingResponse() != null;

final String applicationMessageID =
parameters.getApplicationMessageId() == null
? MessageIdService.generateMessageId()
Expand All @@ -375,11 +381,11 @@ default EncodedMessage encode(CloudOnboardingParameters parameters) {
MessageHeaderParameters messageHeaderParameters = new MessageHeaderParameters();
messageHeaderParameters.setApplicationMessageId(applicationMessageID);
messageHeaderParameters.setTeamSetContextId(teamsetContextId);
messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));

setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
messageHeaderParameters.setMetadata(MessageOuterClass.Metadata.newBuilder().build());
messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_CLOUD_ONBOARD_ENDPOINTS);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);
Expand Down Expand Up @@ -414,8 +420,6 @@ default EncodedMessage encode(CloudOnboardingParameters parameters) {
* @return -
*/
default EncodedMessage encode(CloudOffboardingParameters parameters) {
assert parameters.getOnboardingResponse() != null;

final String applicationMessageID =
parameters.getApplicationMessageId() == null
? MessageIdService.generateMessageId()
Expand All @@ -430,12 +434,11 @@ default EncodedMessage encode(CloudOffboardingParameters parameters) {
messageHeaderParameters.setTechnicalMessageType(SystemMessageType.DKE_CLOUD_OFFBOARD_ENDPOINTS);
messageHeaderParameters.setMode(Request.RequestEnvelope.Mode.DIRECT);
messageHeaderParameters.setMetadata(MessageOuterClass.Metadata.newBuilder().build());
messageHeaderParameters.setApplicationMessageSeqNo(
parameters.getSequenceNumber() != 0
? parameters.getSequenceNumber()
: SequenceNumberService.generateSequenceNumberForEndpoint(
parameters.getOnboardingResponse()));

setSequenceNumber(
messageHeaderParameters,
parameters.getSequenceNumber(),
parameters.getOnboardingResponse());
PayloadParameters payloadParameters = new PayloadParameters();
payloadParameters.setTypeUrl(SystemMessageType.DKE_CLOUD_OFFBOARD_ENDPOINTS.getTypeUrl());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ public List<MessageParameterTuple> chunkAndBase64EncodeEachChunk(
payload.copyFrom(payloadParameters);
payload.setValue(
ByteString.copyFromUtf8(
Base64.getEncoder().encodeToString(payloadParameters.getValue()
Base64.getEncoder()
.encodeToString(
payloadParameters
.getValue()
.toStringUtf8()
.getBytes(StandardCharsets.UTF_8))));
return Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public SetSubscriptionServiceImpl(IMqttClient mqttClient) {
public String send(SetSubscriptionParameters parameters) {
parameters.validate();
try {
EncodedMessage encodedMessage = this.encodeMessage(parameters);
EncodedMessage encodedMessage = this.encode(parameters);
SendMessageParameters sendMessageParameters = new SendMessageParameters();
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
sendMessageParameters.setEncodedMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public SetSubscriptionServiceImpl(Environment environment) {
@Override
public String send(SetSubscriptionParameters parameters) {
parameters.validate();
EncodedMessage encodedMessage = this.encodeMessage(parameters);
EncodedMessage encodedMessage = this.encode(parameters);
SendMessageParameters sendMessageParameters = new SendMessageParameters();
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
sendMessageParameters.setEncodedMessages(
Expand All @@ -40,7 +40,7 @@ public String send(SetSubscriptionParameters parameters) {
@Override
public HttpAsyncMessageSendingResult sendAsync(SetSubscriptionParameters parameters) {
parameters.validate();
EncodedMessage encodedMessage = this.encodeMessage(parameters);
EncodedMessage encodedMessage = this.encode(parameters);
SendMessageParameters sendMessageParameters = new SendMessageParameters();
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
sendMessageParameters.setEncodedMessages(
Expand Down
Loading