diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index e1268673c14b4..8f8225165b668 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -93,6 +93,7 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import static java.lang.System.arraycopy; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.tracing.SpanTags.SOCKET_WRITE_BYTES; @@ -647,16 +648,20 @@ IgniteInternalFuture send(GridNioSession ses, GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; + byte[] serialized = serializeMessage(ses, msg); + if (createFut) { NioOperationFuture fut = new NioOperationFuture(impl, NioOperation.REQUIRE_WRITE, msg, skipRecoveryPred.apply(msg), ackC); + fut.serializedMessage(serialized); + send0(impl, fut, false); return fut; } else { - SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC); + var req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC, serialized); send0(impl, req, false); @@ -664,6 +669,75 @@ IgniteInternalFuture send(GridNioSession ses, } } + /** + * Serializes message to byte array eagerly in the calling thread. + * + * @param ses Session. + * @param msg Message to serialize. + * @return Serialized message bytes, or {@code null} if eager serialization is not available. + */ + private byte @Nullable [] serializeMessage(GridNioSession ses, Message msg) { + if (writerFactory == null || msgFactory == null) + return null; + + // Skip eager serialization for GridIoMessage — inner messages may contain + // MarshallableMessage payloads whose prepareMarshal() performs heavy blocking operations + // (e.g., U.marshal() → registerClassName() → future.get()) that can deadlock + // when called from certain threads (e.g., disco-notifier-worker). + if (msg instanceof GridIoMessage) + return null; + + MessageWriter writer; + MessageSerializer msgSer; + + try { + writer = writerFactory.writer(ses); + msgSer = msgFactory.serializer(msg.directType()); + } + catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("Failed to prepare eager serialization, will use lazy path [msg=" + msg + ", err=" + e + ']'); + + return null; + } + + try { + int capacity = 1024; + var buf = ByteBuffer.allocate(capacity); + + while (true) { + writer.setBuffer(buf); + + if (msgSer.writeTo(msg, writer)) { + writer.reset(); + + int len = buf.position(); + var result = new byte[len]; + + arraycopy(buf.array(), 0, result, 0, len); + + return result; + } + + int pos = buf.position(); + + capacity *= 2; + + var newBuf = ByteBuffer.allocate(capacity); + + arraycopy(buf.array(), 0, newBuf.array(), 0, pos); + newBuf.position(pos); + + buf = newBuf; + } + } + catch (Exception e) { + log.warning("Failed to eagerly serialize message, will use lazy path [msg=" + msg + ']', e); + + return null; + } + } + /** * @param ses Session. * @param req Request. @@ -1614,7 +1688,21 @@ private boolean writeToBuffer( int startPos = buf.position(); - if (messageFactory() == null) { + byte[] serialized = req.serializedMessage(); + + if (serialized != null) { + int off = req.serializedOffset(); + int len = Math.min(serialized.length - off, buf.remaining()); + + buf.put(serialized, off, len); + off += len; + + finished = off >= serialized.length; + + if (!finished) + req.serializedOffset(off); + } + else if (messageFactory() == null) { assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554. finished = ((ClientMessage)msg).writeTo(buf); @@ -1815,7 +1903,21 @@ private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, Se int startPos = buf.position(); - if (msgFactory == null) { + byte[] serialized = req.serializedMessage(); + + if (serialized != null) { + int off = req.serializedOffset(); + int len = Math.min(serialized.length - off, buf.remaining()); + + buf.put(serialized, off, len); + off += len; + + finished = off >= serialized.length; + + if (!finished) + req.serializedOffset(off); + } + else if (msgFactory == null) { assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554. finished = ((ClientMessage)msg).writeTo(buf); @@ -3378,6 +3480,12 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang /** Span for tracing. */ private Span span; + /** Pre-serialized message bytes. */ + private final byte[] serializedMsg; + + /** Current offset in pre-serialized bytes for partial writes. */ + private int serializedOff; + /** * @param ses Session. * @param msg Message. @@ -3388,11 +3496,27 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang Object msg, boolean skipRecovery, IgniteInClosure ackC) { + this(ses, msg, skipRecovery, ackC, null); + } + + /** + * @param ses Session. + * @param msg Message. + * @param skipRecovery Skip recovery flag. + * @param ackC Closure invoked when message ACK is received. + * @param serializedMsg Pre-serialized message bytes. + */ + WriteRequestImpl(GridNioSession ses, + Object msg, + boolean skipRecovery, + IgniteInClosure ackC, + byte[] serializedMsg) { this.ses = ses; this.msg = msg; this.skipRecovery = skipRecovery; this.ackC = ackC; this.span = MTC.span(); + this.serializedMsg = serializedMsg; } /** {@inheritDoc} */ @@ -3433,6 +3557,7 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang /** {@inheritDoc} */ @Override public void resetSession(GridNioSession ses) { this.ses = ses; + this.serializedOff = 0; } /** {@inheritDoc} */ @@ -3450,6 +3575,21 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang return span; } + /** {@inheritDoc} */ + @Override public byte[] serializedMessage() { + return serializedMsg; + } + + /** {@inheritDoc} */ + @Override public int serializedOffset() { + return serializedOff; + } + + /** {@inheritDoc} */ + @Override public void serializedOffset(int off) { + serializedOff = off; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(WriteRequestImpl.class, this); @@ -3496,6 +3636,12 @@ private static class NioOperationFuture extends GridFutureAdapter implemen /** */ private IgniteInClosure ackC; + /** Pre-serialized message bytes. */ + private byte[] serializedMsg; + + /** Current offset in pre-serialized bytes for partial writes. */ + private int serializedOff; + /** * @param sockCh Socket channel. * @param accepted {@code True} if socket has been accepted. @@ -3606,6 +3752,7 @@ private static class NioOperationFuture extends GridFutureAdapter implemen assert msg instanceof Message : msg; this.ses = (GridSelectorNioSessionImpl)ses; + this.serializedOff = 0; } /** @@ -3664,6 +3811,26 @@ boolean accepted() { return skipRecovery; } + /** {@inheritDoc} */ + @Override public byte[] serializedMessage() { + return serializedMsg; + } + + /** {@inheritDoc} */ + @Override public int serializedOffset() { + return serializedOff; + } + + /** {@inheritDoc} */ + @Override public void serializedOffset(int off) { + serializedOff = off; + } + + /** Sets pre-serialized message bytes. */ + void serializedMessage(byte[] serializedMsg) { + this.serializedMsg = serializedMsg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NioOperationFuture.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java index 8f86dc02c227b..54acaaa421bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java @@ -78,4 +78,19 @@ public interface SessionWriteRequest { * @return Span. */ Span span(); + + /** @return Pre-serialized message bytes, or {@code null} if not pre-serialized. */ + default byte[] serializedMessage() { + return null; + } + + /** @return Offset in serialized bytes from which to continue writing. */ + default int serializedOffset() { + return 0; + } + + /** @param off New offset after partial write. */ + default void serializedOffset(int off) { + // No-op. + } } diff --git a/modules/numa-allocator/pom.xml b/modules/numa-allocator/pom.xml index 6e76e8ee1d252..dd630bc339ada 100644 --- a/modules/numa-allocator/pom.xml +++ b/modules/numa-allocator/pom.xml @@ -106,7 +106,7 @@ com.googlecode.cmake-maven-project cmake-maven-plugin - 3.7.2-b1 + 3.26.3-b1 cmake-generate