From f591ae7b955aeedc4ab1a881c948dcaf369539bf Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 8 Apr 2026 15:01:19 +0500 Subject: [PATCH] IGNITE-27950 Use MarshallableMessage for GridContinuousMessage --- .../communication/GridIoMessageFactory.java | 4 +- .../continuous/GridContinuousMessage.java | 45 +++++++------------ .../continuous/GridContinuousProcessor.java | 27 ----------- 3 files changed, 18 insertions(+), 58 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b284cab5ddac1..40d992ca5c378 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -280,7 +280,7 @@ import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessageSerializer; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; -import org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer; +import org.apache.ignite.internal.processors.continuous.GridContinuousMessageMarshallableSerializer; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -453,7 +453,7 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(57, GridNearUnlockRequest::new, new GridNearUnlockRequestSerializer()); factory.register(58, GridCacheQueryRequest::new, new GridCacheQueryRequestSerializer()); factory.register(59, GridCacheQueryResponse::new, new GridCacheQueryResponseSerializer()); - factory.register(61, GridContinuousMessage::new, new GridContinuousMessageSerializer()); + factory.register(61, GridContinuousMessage::new, new GridContinuousMessageMarshallableSerializer(marsh, clsLdr)); factory.register(62, DataStreamerRequest::new, new DataStreamerRequestSerializer()); factory.register(63, DataStreamerResponse::new, new DataStreamerResponseSerializer()); factory.register(76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java index 68ac58db08abe..331fedbc7eb86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java @@ -19,10 +19,14 @@ import java.util.Collection; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -31,7 +35,7 @@ /** * Continuous processor message. */ -public class GridContinuousMessage implements Message { +public class GridContinuousMessage implements MarshallableMessage { /** Message type. */ @Order(0) GridContinuousMessageType type; @@ -102,13 +106,6 @@ public UUID routineId() { return routineId; } - /** - * @return {@code True} is data is collection of messages. - */ - public boolean messages() { - return msgs != null; - } - /** * @return Message data. */ @@ -116,27 +113,6 @@ public T data() { return msgs != null ? (T)msgs : (T)data; } - /** - * @param data Message data. - */ - public void data(Object data) { - this.data = data; - } - - /** - * @return Serialized message data. - */ - public byte[] dataBytes() { - return dataBytes; - } - - /** - * @param dataBytes Serialized message data. - */ - public void dataBytes(byte[] dataBytes) { - this.dataBytes = dataBytes; - } - /** * @return Future ID for synchronous event notification. */ @@ -144,6 +120,17 @@ public void dataBytes(byte[] dataBytes) { return futId; } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (data != null) + dataBytes = U.marshal(marsh, data); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (dataBytes != null) + data = U.unmarshal(marsh, dataBytes, clsLdr); + } /** {@inheritDoc} */ @Override public String toString() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1786bdf400317..f10d1b0fa0f4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -280,17 +280,6 @@ public GridContinuousProcessor(GridKernalContext ctx) { else { GridContinuousMessage msg = (GridContinuousMessage)obj; - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); - - return; - } - } - switch (msg.type()) { case MSG_EVT_NOTIFICATION: processNotification(nodeId, msg); @@ -1022,17 +1011,6 @@ private void registerMessageListener(GridContinuousHandler hnd) { // Only notification can be ordered. assert msg.type() == MSG_EVT_NOTIFICATION; - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); - - return; - } - } - processNotification(nodeId, msg); } }); @@ -1857,11 +1835,6 @@ private void sendWithRetries(Collection nodes, GridContin assert !F.isEmpty(nodes); assert msg != null; - if (!msg.messages() && - msg.data() != null && - (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) - msg.dataBytes(U.marshal(marsh, msg.data())); - for (ClusterNode node : nodes) { int cnt = 0;