diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 3e3ef98df65c7..9b87b06012e24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -468,6 +468,19 @@ public void testReplicationWithSchema() throws Exception { consumer2.acknowledge(msg2); consumer3.acknowledge(msg3); } + + @Cleanup + Producer producerBytes = client1.newProducer() + .topic(topic.toString()) + .enableBatching(false) + .create(); + + byte[] data = "Bytes".getBytes(); + producerBytes.send(data); + + assertEquals(consumer1.receive().getValue().getNativeObject(), data); + assertEquals(consumer2.receive().getValue().getNativeObject(), data); + assertEquals(consumer3.receive().getValue().getNativeObject(), data); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 19dd53725897d..e119907d3e33d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -704,13 +704,22 @@ private void serializeAndSendMessage(MessageImpl msg, } } - private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { + @VisibleForTesting + boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { MessageMetadata msgMetadataBuilder = msg.getMessageBuilder(); if (msg.getSchemaInternal() == schema) { schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(v)); msg.setSchemaState(MessageImpl.SchemaState.Ready); return true; } + // If the message is from the replicator and without replicated schema + // Which means the message is written with BYTES schema + // So we don't need to replicate schema to the remote cluster + if (msg.hasReplicateFrom() && msg.getSchemaInfoForReplicator() == null) { + msg.setSchemaState(MessageImpl.SchemaState.Ready); + return true; + } + if (!isMultiSchemaEnabled(true)) { PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index 8de554c751640..8ca7a59969515 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -18,11 +18,17 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import java.nio.ByteBuffer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.mockito.Mockito; import org.testng.annotations.Test; public class ProducerImplTest { @@ -47,4 +53,17 @@ public void testChunkedMessageCtxDeallocate() { // check if the ctx is deallocated successfully. assertNull(ctx.firstChunkMessageId); } + + @Test + public void testPopulateMessageSchema() { + MessageImpl msg = mock(MessageImpl.class); + when(msg.hasReplicateFrom()).thenReturn(true); + when(msg.getSchemaInternal()).thenReturn(mock(Schema.class)); + when(msg.getSchemaInfoForReplicator()).thenReturn(null); + ProducerImpl producer = mock(ProducerImpl.class, withSettings() + .defaultAnswer(Mockito.CALLS_REAL_METHODS)); + assertTrue(producer.populateMessageSchema(msg, null)); + verify(msg).setSchemaState(MessageImpl.SchemaState.Ready); + } + }