Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

import static java.lang.System.arraycopy;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SOCKET_WRITE_BYTES;
Expand Down Expand Up @@ -647,23 +648,96 @@ IgniteInternalFuture<?> send(GridNioSession ses,

GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;

byte[] serialized = serializeMessage(ses, msg);

if (createFut) {
NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
skipRecoveryPred.apply(msg), ackC);

fut.serializedMessage(serialized);

send0(impl, fut, false);

return fut;
}
else {
SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC);
var req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC, serialized);

send0(impl, req, false);

return null;
}
}

/**
* Serializes message to byte array eagerly in the calling thread.
*
* @param ses Session.
* @param msg Message to serialize.
* @return Serialized message bytes, or {@code null} if eager serialization is not available.
*/
private byte @Nullable [] serializeMessage(GridNioSession ses, Message msg) {
if (writerFactory == null || msgFactory == null)
return null;

// Skip eager serialization for GridIoMessage — inner messages may contain
// MarshallableMessage payloads whose prepareMarshal() performs heavy blocking operations
// (e.g., U.marshal() → registerClassName() → future.get()) that can deadlock
// when called from certain threads (e.g., disco-notifier-worker).
if (msg instanceof GridIoMessage)
return null;

MessageWriter writer;
MessageSerializer<Message> msgSer;

try {
writer = writerFactory.writer(ses);
msgSer = msgFactory.serializer(msg.directType());
}
catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Failed to prepare eager serialization, will use lazy path [msg=" + msg + ", err=" + e + ']');

return null;
}

try {
int capacity = 1024;
var buf = ByteBuffer.allocate(capacity);

while (true) {
writer.setBuffer(buf);

if (msgSer.writeTo(msg, writer)) {
writer.reset();

int len = buf.position();
var result = new byte[len];

arraycopy(buf.array(), 0, result, 0, len);

return result;
}

int pos = buf.position();

capacity *= 2;

var newBuf = ByteBuffer.allocate(capacity);

arraycopy(buf.array(), 0, newBuf.array(), 0, pos);
newBuf.position(pos);

buf = newBuf;
}
}
catch (Exception e) {
log.warning("Failed to eagerly serialize message, will use lazy path [msg=" + msg + ']', e);

return null;
}
}

/**
* @param ses Session.
* @param req Request.
Expand Down Expand Up @@ -1614,7 +1688,21 @@ private boolean writeToBuffer(

int startPos = buf.position();

if (messageFactory() == null) {
byte[] serialized = req.serializedMessage();

if (serialized != null) {
int off = req.serializedOffset();
int len = Math.min(serialized.length - off, buf.remaining());

buf.put(serialized, off, len);
off += len;

finished = off >= serialized.length;

if (!finished)
req.serializedOffset(off);
}
else if (messageFactory() == null) {
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.

finished = ((ClientMessage)msg).writeTo(buf);
Expand Down Expand Up @@ -1815,7 +1903,21 @@ private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, Se

int startPos = buf.position();

if (msgFactory == null) {
byte[] serialized = req.serializedMessage();

if (serialized != null) {
int off = req.serializedOffset();
int len = Math.min(serialized.length - off, buf.remaining());

buf.put(serialized, off, len);
off += len;

finished = off >= serialized.length;

if (!finished)
req.serializedOffset(off);
}
else if (msgFactory == null) {
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.

finished = ((ClientMessage)msg).writeTo(buf);
Expand Down Expand Up @@ -3378,6 +3480,12 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
/** Span for tracing. */
private Span span;

/** Pre-serialized message bytes. */
private final byte[] serializedMsg;

/** Current offset in pre-serialized bytes for partial writes. */
private int serializedOff;

/**
* @param ses Session.
* @param msg Message.
Expand All @@ -3388,11 +3496,27 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
Object msg,
boolean skipRecovery,
IgniteInClosure<IgniteException> ackC) {
this(ses, msg, skipRecovery, ackC, null);
}

/**
* @param ses Session.
* @param msg Message.
* @param skipRecovery Skip recovery flag.
* @param ackC Closure invoked when message ACK is received.
* @param serializedMsg Pre-serialized message bytes.
*/
WriteRequestImpl(GridNioSession ses,
Object msg,
boolean skipRecovery,
IgniteInClosure<IgniteException> ackC,
byte[] serializedMsg) {
this.ses = ses;
this.msg = msg;
this.skipRecovery = skipRecovery;
this.ackC = ackC;
this.span = MTC.span();
this.serializedMsg = serializedMsg;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -3433,6 +3557,7 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
/** {@inheritDoc} */
@Override public void resetSession(GridNioSession ses) {
this.ses = ses;
this.serializedOff = 0;
}

/** {@inheritDoc} */
Expand All @@ -3450,6 +3575,21 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
return span;
}

/** {@inheritDoc} */
@Override public byte[] serializedMessage() {
return serializedMsg;
}

/** {@inheritDoc} */
@Override public int serializedOffset() {
return serializedOff;
}

/** {@inheritDoc} */
@Override public void serializedOffset(int off) {
serializedOff = off;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(WriteRequestImpl.class, this);
Expand Down Expand Up @@ -3496,6 +3636,12 @@ private static class NioOperationFuture<R> extends GridFutureAdapter<R> implemen
/** */
private IgniteInClosure<IgniteException> ackC;

/** Pre-serialized message bytes. */
private byte[] serializedMsg;

/** Current offset in pre-serialized bytes for partial writes. */
private int serializedOff;

/**
* @param sockCh Socket channel.
* @param accepted {@code True} if socket has been accepted.
Expand Down Expand Up @@ -3606,6 +3752,7 @@ private static class NioOperationFuture<R> extends GridFutureAdapter<R> implemen
assert msg instanceof Message : msg;

this.ses = (GridSelectorNioSessionImpl)ses;
this.serializedOff = 0;
}

/**
Expand Down Expand Up @@ -3664,6 +3811,26 @@ boolean accepted() {
return skipRecovery;
}

/** {@inheritDoc} */
@Override public byte[] serializedMessage() {
return serializedMsg;
}

/** {@inheritDoc} */
@Override public int serializedOffset() {
return serializedOff;
}

/** {@inheritDoc} */
@Override public void serializedOffset(int off) {
serializedOff = off;
}

/** Sets pre-serialized message bytes. */
void serializedMessage(byte[] serializedMsg) {
this.serializedMsg = serializedMsg;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(NioOperationFuture.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,19 @@ public interface SessionWriteRequest {
* @return Span.
*/
Span span();

/** @return Pre-serialized message bytes, or {@code null} if not pre-serialized. */
default byte[] serializedMessage() {
return null;
}

/** @return Offset in serialized bytes from which to continue writing. */
default int serializedOffset() {
return 0;
}

/** @param off New offset after partial write. */
default void serializedOffset(int off) {
// No-op.
}
}
2 changes: 1 addition & 1 deletion modules/numa-allocator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
<plugin>
<groupId>com.googlecode.cmake-maven-project</groupId>
<artifactId>cmake-maven-plugin</artifactId>
<version>3.7.2-b1</version>
<version>3.26.3-b1</version>
<executions>
<execution>
<id>cmake-generate</id>
Expand Down