Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessageSerializer;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer;
import org.apache.ignite.internal.processors.continuous.GridContinuousMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntrySerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
Expand Down Expand Up @@ -453,7 +453,7 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
factory.register(57, GridNearUnlockRequest::new, new GridNearUnlockRequestSerializer());
factory.register(58, GridCacheQueryRequest::new, new GridCacheQueryRequestSerializer());
factory.register(59, GridCacheQueryResponse::new, new GridCacheQueryResponseSerializer());
factory.register(61, GridContinuousMessage::new, new GridContinuousMessageSerializer());
factory.register(61, GridContinuousMessage::new, new GridContinuousMessageMarshallableSerializer(marsh, clsLdr));
factory.register(62, DataStreamerRequest::new, new DataStreamerRequestSerializer());
factory.register(63, DataStreamerResponse::new, new DataStreamerResponseSerializer());
factory.register(76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

Expand All @@ -31,7 +35,7 @@
/**
* Continuous processor message.
*/
public class GridContinuousMessage implements Message {
public class GridContinuousMessage implements MarshallableMessage {
/** Message type. */
@Order(0)
GridContinuousMessageType type;
Expand Down Expand Up @@ -102,48 +106,31 @@ public UUID routineId() {
return routineId;
}

/**
* @return {@code True} is data is collection of messages.
*/
public boolean messages() {
return msgs != null;
}

/**
* @return Message data.
*/
public <T> T data() {
return msgs != null ? (T)msgs : (T)data;
}

/**
* @param data Message data.
*/
public void data(Object data) {
this.data = data;
}

/**
* @return Serialized message data.
*/
public byte[] dataBytes() {
return dataBytes;
}

/**
* @param dataBytes Serialized message data.
*/
public void dataBytes(byte[] dataBytes) {
this.dataBytes = dataBytes;
}

/**
* @return Future ID for synchronous event notification.
*/
@Nullable public IgniteUuid futureId() {
return futId;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (data != null)
dataBytes = U.marshal(marsh, data);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (dataBytes != null)
data = U.unmarshal(marsh, dataBytes, clsLdr);
}

/** {@inheritDoc} */
@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,6 @@ public GridContinuousProcessor(GridKernalContext ctx) {
else {
GridContinuousMessage msg = (GridContinuousMessage)obj;

if (msg.data() == null && msg.dataBytes() != null) {
try {
msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);

return;
}
}

switch (msg.type()) {
case MSG_EVT_NOTIFICATION:
processNotification(nodeId, msg);
Expand Down Expand Up @@ -1022,17 +1011,6 @@ private void registerMessageListener(GridContinuousHandler hnd) {
// Only notification can be ordered.
assert msg.type() == MSG_EVT_NOTIFICATION;

if (msg.data() == null && msg.dataBytes() != null) {
try {
msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config())));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process message (ignoring): " + msg, e);

return;
}
}

processNotification(nodeId, msg);
}
});
Expand Down Expand Up @@ -1857,11 +1835,6 @@ private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContin
assert !F.isEmpty(nodes);
assert msg != null;

if (!msg.messages() &&
msg.data() != null &&
(nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
msg.dataBytes(U.marshal(marsh, msg.data()));

for (ClusterNode node : nodes) {
int cnt = 0;

Expand Down
Loading