From 01aaa5697043197fa4ac1a96845c82a2331be921 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 3 Mar 2026 17:03:35 +0500 Subject: [PATCH 1/5] IGNITE-28046 Use MessageSerializer for MetadataUpdateProposedMessage --- .../discovery/DiscoveryMessageFactory.java | 4 + .../cache/binary/BinaryMetadataTransport.java | 6 +- .../binary/MetadataUpdateProposedMessage.java | 84 +++++++++++++------ .../resources/META-INF/classnames.properties | 1 - ...niteDiscoveryMessageSerializationTest.java | 24 ++++++ 5 files changed, 89 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index e30853967bb82..e58783efd48c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -49,6 +49,8 @@ import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageMarshallableSerializer; import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult; import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult; @@ -330,5 +332,7 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register((short)529, SnapshotCheckHandlersNodeResponse::new, new SnapshotCheckHandlersNodeResponseSerializer()); factory.register((short)530, SnapshotPartitionsVerifyHandlerResponse::new, new SnapshotPartitionsVerifyHandlerResponseMarshallableSerializer(marsh, clsLdr)); + factory.register((short)531, MetadataUpdateProposedMessage::new, + new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 0cfb2d96768b9..7c6c8a9dbc005 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -550,7 +550,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene //coordinator receives update request if (metaVerInfo != null) { if (metaVerInfo.removing()) { - msg.markRejected(new BinaryObjectException("The type is removing now [typeId=" + typeId + ']')); + msg.markRejected("The type is removing now [typeId=" + typeId + ']'); pendingVer = REMOVED_VERSION; acceptedVer = REMOVED_VERSION; @@ -589,7 +589,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene catch (BinaryObjectException err) { log.warning("Exception with merging metadata for typeId: " + typeId, err); - msg.markRejected(err); + msg.markRejected(err.getMessage()); } } } @@ -602,7 +602,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene MetadataUpdateResultFuture fut = unlabeledFutures.poll(); if (msg.rejected()) - fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError())); + fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionErrorMessage())); else { if (clientNode) { boolean success = casBinaryMetadata(typeId, new BinaryMetadataVersionInfo(msg.metadata(), pendingVer, acceptedVer)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index b73be5f2c1300..9af429a0696fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -17,21 +17,25 @@ package org.apache.ignite.internal.processors.cache.binary; import java.util.UUID; -import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryMetadataHandler; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * MetadataUpdateProposedMessage and {@link MetadataUpdateAcceptedMessage} messages make a basis for * discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches. - * + *

* All interactions with binary metadata are performed through {@link BinaryMetadataHandler} * interface implemented in {@link CacheObjectBinaryProcessorImpl} processor. - * + *

* Protocol works as follows: *

    *
  1. @@ -67,33 +71,49 @@ * it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with accepted version * equals to pending version of this metadata to the moment when is was initially read by the thread. */ -public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage { +public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Node UUID which initiated metadata update. */ - private final UUID origNodeId; + @Order(1) + UUID origNodeId; /** */ private BinaryMetadata metadata; + /** Serialized {@link #metadata}. */ + @Order(2) + byte[] metadataBytes; + /** Metadata type id. */ - private final int typeId; + @Order(3) + int typeId; /** Metadata version which is pending for update. */ - private int pendingVer; + @Order(4) + int pendingVer; /** Metadata version which is already accepted by entire cluster. */ - private int acceptedVer; + @Order(5) + int acceptedVer; /** Message acceptance status. */ - private ProposalStatus status = ProposalStatus.SUCCESSFUL; + @Order(6) + boolean rejected; /** */ - private BinaryObjectException err; + @Order(7) + String errMsg; + + /** Constructor. */ + public MetadataUpdateProposedMessage() { + // No-op. + } /** * @param metadata {@link BinaryMetadata} requested to be updated. @@ -103,6 +123,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) { assert origNodeId != null; assert metadata != null; + id = IgniteUuid.randomUuid(); this.origNodeId = origNodeId; this.metadata = metadata; @@ -120,7 +141,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) { * {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null; + return !rejected ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null; } /** @@ -131,25 +152,25 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) { } /** - * @param err Error caused this update to be rejected. + * @param errMsg Error message caused this update to be rejected. */ - void markRejected(BinaryObjectException err) { - status = ProposalStatus.REJECTED; - this.err = err; + void markRejected(String errMsg) { + rejected = true; + this.errMsg = errMsg; } /** * */ boolean rejected() { - return status == ProposalStatus.REJECTED; + return rejected; } /** * */ - BinaryObjectException rejectionError() { - return err; + String rejectionErrorMessage() { + return errMsg; } /** @@ -201,6 +222,21 @@ public void metadata(BinaryMetadata metadata) { this.metadata = metadata; } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (metadata != null) + metadataBytes = U.marshal(marsh, metadata); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + if (metadataBytes != null) { + metadata = U.unmarshal(marsh, metadataBytes, ldr); + + metadataBytes = null; + } + } + /** * */ @@ -208,13 +244,9 @@ public int typeId() { return typeId; } - /** Message acceptance status. */ - private enum ProposalStatus { - /** */ - SUCCESSFUL, - - /** */ - REJECTED + /** {@inheritDoc} */ + @Override public short directType() { + return 531; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index f97c0ff469883..ffd46104d189f 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1032,7 +1032,6 @@ org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage -org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus org.apache.ignite.internal.processors.cache.binary.MetadataUpdateResult$ResultType org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$BlockSetCallable org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java index b37398fa1393b..08b57f8ea46c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import static org.apache.ignite.internal.util.IgniteUtils.toBytes; import static org.apache.ignite.marshaller.Marshallers.jdk; /** Serialization test for discovery messages. */ @@ -29,4 +30,27 @@ public class IgniteDiscoveryMessageSerializationTest extends AbstractMessageSeri @Override protected MessageFactoryProvider messageFactory() { return new DiscoveryMessageFactory(jdk(), U.gridClassLoader()); } + + /** {@inheritDoc} */ + @Override protected AbstractTestMessageReader createMessageReader(int capacity) { + return new TestIoMessageReader(capacity); + } + + /** */ + private static class TestIoMessageReader extends AbstractTestMessageReader { + /** */ + private static final byte[] BYTE_ARR = toBytes(null); + + /** */ + public TestIoMessageReader(int capacity) { + super(capacity); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray() { + super.readByteArray(); + + return BYTE_ARR; + } + } } From c47f8a86402b0ad50045b673d216da57378ac49d Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 31 Mar 2026 15:37:11 +0500 Subject: [PATCH 2/5] fix after merge master --- .../internal/managers/discovery/DiscoveryMessageFactory.java | 3 +-- .../cache/binary/MetadataUpdateProposedMessage.java | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index eea4649d16c0a..6240c57c3cfe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -340,7 +340,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { new ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer(marsh, clsLdr)); factory.register(534, DynamicCacheChangeBatch::new, new DynamicCacheChangeBatchMarshallableSerializer(marsh, clsLdr)); - factory.register(535, MetadataUpdateProposedMessage::new, - new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr)); + factory.register(535, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index 9af429a0696fa..0eb6461a1c645 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -244,11 +244,6 @@ public int typeId() { return typeId; } - /** {@inheritDoc} */ - @Override public short directType() { - return 531; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(MetadataUpdateProposedMessage.class, this); From 53c1cbbc67ca8b90fa5530b58e6c23b7c7efd786 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 1 Apr 2026 10:54:17 +0500 Subject: [PATCH 3/5] fix performance degradation for BinaryMetadataUpdatesFlowTest#testConcurrentMetadataUpdates --- .../discovery/DiscoveryMessageFactory.java | 6 +-- .../binary/MetadataUpdateProposedMessage.java | 37 +++++++++++-------- .../TcpDiscoveryCustomEventMessage.java | 20 +++++++++- ...niteDiscoveryMessageSerializationTest.java | 24 ------------ 4 files changed, 41 insertions(+), 46 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index a34daa0a5ada5..ec69c3eff1b87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -56,7 +56,7 @@ import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; -import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageMarshallableSerializer; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer; import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult; import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult; @@ -363,8 +363,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(537, ServiceDeploymentRequest::new, new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr)); factory.register(538, ServiceUndeploymentRequest::new, new ServiceUndeploymentRequestSerializer()); - factory.register(534, DynamicCacheChangeBatch::new, - new DynamicCacheChangeBatchMarshallableSerializer(marsh, clsLdr)); - factory.register(535, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr)); + factory.register(539, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index 0eb6461a1c645..a36e3a3679465 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** @@ -70,8 +70,11 @@ * (with pending version strictly greater than accepted version) * it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with accepted version * equals to pending version of this metadata to the moment when is was initially read by the thread. + *

    + * We don't implement MarshallableMessage for this message because it leads to performance degradation when updating BinaryMetadata + * (see test: BinaryMetadataUpdatesFlowTest#testConcurrentMetadataUpdates). */ -public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, MarshallableMessage { +public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; @@ -222,26 +225,28 @@ public void metadata(BinaryMetadata metadata) { this.metadata = metadata; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (metadata != null) - metadataBytes = U.marshal(marsh, metadata); + /** + * + */ + public int typeId() { + return typeId; } - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { - if (metadataBytes != null) { - metadata = U.unmarshal(marsh, metadataBytes, ldr); - - metadataBytes = null; - } + /** + * @param marsh Marshaller. + */ + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (metadata != null) + metadataBytes = U.marshal(marsh, metadata); } /** - * + * @param marsh Marshaller. + * @param ldr Class loader. */ - public int typeId() { - return typeId; + public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + if (metadataBytes != null) + metadata = U.unmarshal(marsh, metadataBytes, ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index 381a80995c3ef..63acb0b2118fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; +import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; @@ -104,8 +106,15 @@ public DiscoverySpiCustomMessage message() { @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { super.prepareMarshal(marsh); - if (msg instanceof Message) + if (msg instanceof Message) { + if (msg instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)msg).prepareMarshal(marsh); + else if (msg instanceof SecurityAwareCustomMessageWrapper && + ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).prepareMarshal(marsh); + serMsg = (Message)msg; + } else { if (msg != null) msgBytes = U.marshal(marsh, msg); @@ -125,8 +134,15 @@ public DiscoverySpiCustomMessage message() { if (msg != null) return; - if (serMsg != null) + if (serMsg != null) { msg = (DiscoverySpiCustomMessage)serMsg; + + if (msg instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)msg).finishUnmarshal(marsh, ldr); + else if (msg instanceof SecurityAwareCustomMessageWrapper && + ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).finishUnmarshal(marsh, ldr); + } else { try { if (msgBytes != null) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java index 08b57f8ea46c3..b37398fa1393b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import static org.apache.ignite.internal.util.IgniteUtils.toBytes; import static org.apache.ignite.marshaller.Marshallers.jdk; /** Serialization test for discovery messages. */ @@ -30,27 +29,4 @@ public class IgniteDiscoveryMessageSerializationTest extends AbstractMessageSeri @Override protected MessageFactoryProvider messageFactory() { return new DiscoveryMessageFactory(jdk(), U.gridClassLoader()); } - - /** {@inheritDoc} */ - @Override protected AbstractTestMessageReader createMessageReader(int capacity) { - return new TestIoMessageReader(capacity); - } - - /** */ - private static class TestIoMessageReader extends AbstractTestMessageReader { - /** */ - private static final byte[] BYTE_ARR = toBytes(null); - - /** */ - public TestIoMessageReader(int capacity) { - super(capacity); - } - - /** {@inheritDoc} */ - @Override public byte[] readByteArray() { - super.readByteArray(); - - return BYTE_ARR; - } - } } From 74229b88b63bfc829591638c329dc7a38170b5f5 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 1 Apr 2026 17:53:42 +0500 Subject: [PATCH 4/5] add same login to Zookeeper SPI --- .../zk/internal/DiscoveryMessageParser.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java index ea83271c9f54d..63d79984305a9 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -30,6 +30,8 @@ import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; @@ -77,6 +79,12 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) { if (msg instanceof Message) { out.write(MESSAGE_SERIALIZATION); + if (msg instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)msg).prepareMarshal(marsh); + else if (msg instanceof SecurityAwareCustomMessageWrapper && + ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).prepareMarshal(marsh); + serializeMessage((Message)msg, out); } else { @@ -106,7 +114,16 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) { if (MESSAGE_SERIALIZATION != mode) throw new IOException("Received unexpected byte while reading discovery message: " + mode); - return (DiscoverySpiCustomMessage)deserializeMessage(in); + DiscoverySpiCustomMessage msg = (DiscoverySpiCustomMessage)deserializeMessage(in); + + if (msg instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)msg).finishUnmarshal(marsh, U.gridClassLoader()); + else if (msg instanceof SecurityAwareCustomMessageWrapper && + ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) + ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()) + .finishUnmarshal(marsh, U.gridClassLoader()); + + return msg; } catch (Exception e) { throw new IgniteSpiException("Failed to deserialize message.", e); From 35827755609cf5c27920a1591d848cff7176e56c Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Thu, 9 Apr 2026 12:30:07 +0500 Subject: [PATCH 5/5] revert MarshallableMessage --- .../discovery/DiscoveryMessageFactory.java | 4 ++-- .../binary/MetadataUpdateProposedMessage.java | 20 ++++++------------- .../TcpDiscoveryCustomEventMessage.java | 20 ++----------------- .../zk/internal/DiscoveryMessageParser.java | 19 +----------------- 4 files changed, 11 insertions(+), 52 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index df8016a8c1aea..e6d3074892849 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -58,7 +58,7 @@ import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; -import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageMarshallableSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer; import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult; @@ -381,6 +381,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(540, StartRoutineDiscoveryMessage::new, new StartRoutineDiscoveryMessageSerializer()); factory.register(541, StartRoutineAckDiscoveryMessage::new, new StartRoutineAckDiscoveryMessageSerializer()); factory.register(542, StartRoutineDiscoveryMessageV2::new, new StartRoutineDiscoveryMessageV2Serializer()); - factory.register(543, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer()); + factory.register(543, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index a36e3a3679465..2618b4dc3c184 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** @@ -70,11 +70,8 @@ * (with pending version strictly greater than accepted version) * it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with accepted version * equals to pending version of this metadata to the moment when is was initially read by the thread. - *

    - * We don't implement MarshallableMessage for this message because it leads to performance degradation when updating BinaryMetadata - * (see test: BinaryMetadataUpdatesFlowTest#testConcurrentMetadataUpdates). */ -public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message { +public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -232,19 +229,14 @@ public int typeId() { return typeId; } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (metadata != null) metadataBytes = U.marshal(marsh, metadata); } - /** - * @param marsh Marshaller. - * @param ldr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { if (metadataBytes != null) metadata = U.unmarshal(marsh, metadataBytes, ldr); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index e78d964c220b2..4d37c8c145365 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -22,8 +22,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; -import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; @@ -105,15 +103,8 @@ public DiscoverySpiCustomMessage message() { @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { super.prepareMarshal(marsh); - if (msg instanceof Message) { - if (msg instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)msg).prepareMarshal(marsh); - else if (msg instanceof SecurityAwareCustomMessageWrapper && - ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).prepareMarshal(marsh); - + if (msg instanceof Message) serMsg = (Message)msg; - } else { if (msg != null) msgBytes = U.marshal(marsh, msg); @@ -133,15 +124,8 @@ else if (msg instanceof SecurityAwareCustomMessageWrapper && if (msg != null) return; - if (serMsg != null) { + if (serMsg != null) msg = (DiscoverySpiCustomMessage)serMsg; - - if (msg instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)msg).finishUnmarshal(marsh, ldr); - else if (msg instanceof SecurityAwareCustomMessageWrapper && - ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).finishUnmarshal(marsh, ldr); - } else { if (msgBytes != null) msg = U.unmarshal(marsh, msgBytes, ldr); diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java index 63d79984305a9..ea83271c9f54d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -30,8 +30,6 @@ import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; -import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; -import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; @@ -79,12 +77,6 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) { if (msg instanceof Message) { out.write(MESSAGE_SERIALIZATION); - if (msg instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)msg).prepareMarshal(marsh); - else if (msg instanceof SecurityAwareCustomMessageWrapper && - ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()).prepareMarshal(marsh); - serializeMessage((Message)msg, out); } else { @@ -114,16 +106,7 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) { if (MESSAGE_SERIALIZATION != mode) throw new IOException("Received unexpected byte while reading discovery message: " + mode); - DiscoverySpiCustomMessage msg = (DiscoverySpiCustomMessage)deserializeMessage(in); - - if (msg instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)msg).finishUnmarshal(marsh, U.gridClassLoader()); - else if (msg instanceof SecurityAwareCustomMessageWrapper && - ((SecurityAwareCustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) - ((MetadataUpdateProposedMessage)((SecurityAwareCustomMessageWrapper)msg).delegate()) - .finishUnmarshal(marsh, U.gridClassLoader()); - - return msg; + return (DiscoverySpiCustomMessage)deserializeMessage(in); } catch (Exception e) { throw new IgniteSpiException("Failed to deserialize message.", e);