From 2eed7d042c89c5f4d5dfeb98a4b6368dddd14ab1 Mon Sep 17 00:00:00 2001 From: Alex Abashev Date: Mon, 6 Apr 2026 23:54:44 +0300 Subject: [PATCH 1/3] IGNITE-28473 Concurrent message serialization in NIO workers causes performance degradation --- .../internal/util/nio/GridNioServer.java | 164 +++++++++++++++++- .../util/nio/SessionWriteRequest.java | 15 ++ modules/numa-allocator/pom.xml | 2 +- 3 files changed, 177 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index e1268673c14b4..c66f54899e8d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -93,6 +93,7 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import static java.lang.System.arraycopy; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.tracing.SpanTags.SOCKET_WRITE_BYTES; @@ -647,16 +648,20 @@ IgniteInternalFuture send(GridNioSession ses, GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; + byte[] serialized = serializeMessage(ses, msg); + if (createFut) { NioOperationFuture fut = new NioOperationFuture(impl, NioOperation.REQUIRE_WRITE, msg, skipRecoveryPred.apply(msg), ackC); + fut.serializedMessage(serialized); + send0(impl, fut, false); return fut; } else { - SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC); + var req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC, serialized); send0(impl, req, false); @@ -664,6 +669,68 @@ IgniteInternalFuture send(GridNioSession ses, } } + /** + * Serializes message to byte array eagerly in the calling thread. + * + * @param ses Session. + * @param msg Message to serialize. + * @return Serialized message bytes, or {@code null} if eager serialization is not available. + */ + private byte @Nullable [] serializeMessage(GridNioSession ses, Message msg) { + if (writerFactory == null || msgFactory == null) + return null; + + MessageWriter writer; + MessageSerializer msgSer; + + try { + writer = writerFactory.writer(ses); + msgSer = msgFactory.serializer(msg.directType()); + } + catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("Failed to prepare eager serialization, will use lazy path [msg=" + msg + ", err=" + e + ']'); + + return null; + } + + try { + int capacity = 4096; + var buf = ByteBuffer.allocate(capacity); + + while (true) { + writer.setBuffer(buf); + + if (msgSer.writeTo(msg, writer)) { + writer.reset(); + + int len = buf.position(); + var result = new byte[len]; + + arraycopy(buf.array(), 0, result, 0, len); + + return result; + } + + int pos = buf.position(); + + capacity *= 2; + + var newBuf = ByteBuffer.allocate(capacity); + + arraycopy(buf.array(), 0, newBuf.array(), 0, pos); + newBuf.position(pos); + + buf = newBuf; + } + } + catch (Exception e) { + log.warning("Failed to eagerly serialize message, will use lazy path [msg=" + msg + ']', e); + + return null; + } + } + /** * @param ses Session. * @param req Request. @@ -1614,7 +1681,21 @@ private boolean writeToBuffer( int startPos = buf.position(); - if (messageFactory() == null) { + byte[] serialized = req.serializedMessage(); + + if (serialized != null) { + int off = req.serializedOffset(); + int len = Math.min(serialized.length - off, buf.remaining()); + + buf.put(serialized, off, len); + off += len; + + finished = off >= serialized.length; + + if (!finished) + req.serializedOffset(off); + } + else if (messageFactory() == null) { assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554. finished = ((ClientMessage)msg).writeTo(buf); @@ -1815,7 +1896,21 @@ private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, Se int startPos = buf.position(); - if (msgFactory == null) { + byte[] serialized = req.serializedMessage(); + + if (serialized != null) { + int off = req.serializedOffset(); + int len = Math.min(serialized.length - off, buf.remaining()); + + buf.put(serialized, off, len); + off += len; + + finished = off >= serialized.length; + + if (!finished) + req.serializedOffset(off); + } + else if (msgFactory == null) { assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554. finished = ((ClientMessage)msg).writeTo(buf); @@ -3378,6 +3473,12 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang /** Span for tracing. */ private Span span; + /** Pre-serialized message bytes. */ + private final byte[] serializedMsg; + + /** Current offset in pre-serialized bytes for partial writes. */ + private int serializedOff; + /** * @param ses Session. * @param msg Message. @@ -3388,11 +3489,27 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang Object msg, boolean skipRecovery, IgniteInClosure ackC) { + this(ses, msg, skipRecovery, ackC, null); + } + + /** + * @param ses Session. + * @param msg Message. + * @param skipRecovery Skip recovery flag. + * @param ackC Closure invoked when message ACK is received. + * @param serializedMsg Pre-serialized message bytes. + */ + WriteRequestImpl(GridNioSession ses, + Object msg, + boolean skipRecovery, + IgniteInClosure ackC, + byte[] serializedMsg) { this.ses = ses; this.msg = msg; this.skipRecovery = skipRecovery; this.ackC = ackC; this.span = MTC.span(); + this.serializedMsg = serializedMsg; } /** {@inheritDoc} */ @@ -3450,6 +3567,21 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang return span; } + /** {@inheritDoc} */ + @Override public byte[] serializedMessage() { + return serializedMsg; + } + + /** {@inheritDoc} */ + @Override public int serializedOffset() { + return serializedOff; + } + + /** {@inheritDoc} */ + @Override public void serializedOffset(int off) { + serializedOff = off; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(WriteRequestImpl.class, this); @@ -3496,6 +3628,12 @@ private static class NioOperationFuture extends GridFutureAdapter implemen /** */ private IgniteInClosure ackC; + /** Pre-serialized message bytes. */ + private byte[] serializedMsg; + + /** Current offset in pre-serialized bytes for partial writes. */ + private int serializedOff; + /** * @param sockCh Socket channel. * @param accepted {@code True} if socket has been accepted. @@ -3664,6 +3802,26 @@ boolean accepted() { return skipRecovery; } + /** {@inheritDoc} */ + @Override public byte[] serializedMessage() { + return serializedMsg; + } + + /** {@inheritDoc} */ + @Override public int serializedOffset() { + return serializedOff; + } + + /** {@inheritDoc} */ + @Override public void serializedOffset(int off) { + serializedOff = off; + } + + /** Sets pre-serialized message bytes. */ + void serializedMessage(byte[] serializedMsg) { + this.serializedMsg = serializedMsg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NioOperationFuture.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java index 8f86dc02c227b..54acaaa421bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java @@ -78,4 +78,19 @@ public interface SessionWriteRequest { * @return Span. */ Span span(); + + /** @return Pre-serialized message bytes, or {@code null} if not pre-serialized. */ + default byte[] serializedMessage() { + return null; + } + + /** @return Offset in serialized bytes from which to continue writing. */ + default int serializedOffset() { + return 0; + } + + /** @param off New offset after partial write. */ + default void serializedOffset(int off) { + // No-op. + } } diff --git a/modules/numa-allocator/pom.xml b/modules/numa-allocator/pom.xml index 6e76e8ee1d252..dd630bc339ada 100644 --- a/modules/numa-allocator/pom.xml +++ b/modules/numa-allocator/pom.xml @@ -106,7 +106,7 @@ com.googlecode.cmake-maven-project cmake-maven-plugin - 3.7.2-b1 + 3.26.3-b1 cmake-generate From 45262212fe6cf1593277e856796f2ba64e2c054c Mon Sep 17 00:00:00 2001 From: Alex Abashev Date: Wed, 8 Apr 2026 00:31:53 +0300 Subject: [PATCH 2/3] Fix deadlock --- .../internal/MarshallerContextImpl.java | 35 +++++++++++++++---- .../internal/util/nio/GridNioServer.java | 4 ++- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 8fa94b71df7b5..92f16ad531d77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -325,9 +325,7 @@ public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) if (failIfUnregistered && !fut.isDone()) throw new UnregisteredBinaryTypeException(typeId, fut); - MappingExchangeResult res = fut.get(); - - return convertXchRes(res); + return resolveMappingExchange(platformId, typeId, clsName, fut); } } else { @@ -341,12 +339,37 @@ public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) if (failIfUnregistered && !fut.isDone()) throw new UnregisteredBinaryTypeException(typeId, fut); - MappingExchangeResult res = fut.get(); - - return convertXchRes(res); + return resolveMappingExchange(platformId, typeId, clsName, fut); } } + /** + * Resolves mapping exchange future. If the future is already done, returns the result immediately. + * Otherwise, registers the class name locally to avoid blocking the calling thread + * (which may be a discovery notifier worker, leading to a deadlock). + * The mapping proposal has already been sent to the cluster and will be accepted asynchronously. + * + * @param platformId Platform id. + * @param typeId Type id. + * @param clsName Class name. + * @param fut Mapping exchange future. + * @return {@code true} if registration succeeded. + * @throws IgniteCheckedException If failed. + */ + private boolean resolveMappingExchange( + byte platformId, + int typeId, + String clsName, + GridFutureAdapter fut + ) throws IgniteCheckedException { + if (fut.isDone()) + return convertXchRes(fut.get()); + + // The mapping proposal is already in-flight to the cluster. + // Register locally to avoid blocking the calling thread — the cluster will accept it asynchronously. + return registerClassNameLocally(platformId, typeId, clsName); + } + /** {@inheritDoc} */ @Override public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index c66f54899e8d2..f1de4c14cee86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -695,7 +695,7 @@ IgniteInternalFuture send(GridNioSession ses, } try { - int capacity = 4096; + int capacity = 1024; var buf = ByteBuffer.allocate(capacity); while (true) { @@ -3550,6 +3550,7 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang /** {@inheritDoc} */ @Override public void resetSession(GridNioSession ses) { this.ses = ses; + this.serializedOff = 0; } /** {@inheritDoc} */ @@ -3744,6 +3745,7 @@ private static class NioOperationFuture extends GridFutureAdapter implemen assert msg instanceof Message : msg; this.ses = (GridSelectorNioSessionImpl)ses; + this.serializedOff = 0; } /** From fd2891618430a8f91ea6203763d846323923f30f Mon Sep 17 00:00:00 2001 From: Alex Abashev Date: Thu, 9 Apr 2026 11:04:07 +0300 Subject: [PATCH 3/3] Back to deadlock in marshaller --- .../internal/MarshallerContextImpl.java | 35 ++++--------------- .../internal/util/nio/GridNioServer.java | 7 ++++ 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 92f16ad531d77..8fa94b71df7b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -325,7 +325,9 @@ public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) if (failIfUnregistered && !fut.isDone()) throw new UnregisteredBinaryTypeException(typeId, fut); - return resolveMappingExchange(platformId, typeId, clsName, fut); + MappingExchangeResult res = fut.get(); + + return convertXchRes(res); } } else { @@ -339,35 +341,10 @@ public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) if (failIfUnregistered && !fut.isDone()) throw new UnregisteredBinaryTypeException(typeId, fut); - return resolveMappingExchange(platformId, typeId, clsName, fut); - } - } + MappingExchangeResult res = fut.get(); - /** - * Resolves mapping exchange future. If the future is already done, returns the result immediately. - * Otherwise, registers the class name locally to avoid blocking the calling thread - * (which may be a discovery notifier worker, leading to a deadlock). - * The mapping proposal has already been sent to the cluster and will be accepted asynchronously. - * - * @param platformId Platform id. - * @param typeId Type id. - * @param clsName Class name. - * @param fut Mapping exchange future. - * @return {@code true} if registration succeeded. - * @throws IgniteCheckedException If failed. - */ - private boolean resolveMappingExchange( - byte platformId, - int typeId, - String clsName, - GridFutureAdapter fut - ) throws IgniteCheckedException { - if (fut.isDone()) - return convertXchRes(fut.get()); - - // The mapping proposal is already in-flight to the cluster. - // Register locally to avoid blocking the calling thread — the cluster will accept it asynchronously. - return registerClassNameLocally(platformId, typeId, clsName); + return convertXchRes(res); + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index f1de4c14cee86..8f8225165b668 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -680,6 +680,13 @@ IgniteInternalFuture send(GridNioSession ses, if (writerFactory == null || msgFactory == null) return null; + // Skip eager serialization for GridIoMessage — inner messages may contain + // MarshallableMessage payloads whose prepareMarshal() performs heavy blocking operations + // (e.g., U.marshal() → registerClassName() → future.get()) that can deadlock + // when called from certain threads (e.g., disco-notifier-worker). + if (msg instanceof GridIoMessage) + return null; + MessageWriter writer; MessageSerializer msgSer;