diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 1747b19429..9a4350dcc4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -632,7 +632,7 @@ private void doPublishMessages(TopicName topic, int size) { topic.toString(), ex); result.getRight().complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { - result.getRight().complete(new PartitionResponse(Errors.NONE)); + result.getRight().complete(new PartitionResponse(Errors.NONE, offset, -1L, -1L)); } headerAndPayload.release(); }); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java index 030feb3fc6..e405142ca5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java @@ -65,7 +65,7 @@ public void completed(Exception exception, long ledgerId, long entryId) { topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.MICROSECONDS); - offsetFuture.complete(Long.valueOf(MessageIdUtils.getOffset(ledgerId, entryId))); + offsetFuture.complete(MessageIdUtils.getOffset(ledgerId, entryId)); } recycle(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 27cf7b5de5..3ccca66481 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -18,7 +18,9 @@ import static io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils.getPartitionedTopicNameWithoutPartitions; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; @@ -28,10 +30,14 @@ import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndResponse; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -43,6 +49,9 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -51,7 +60,13 @@ import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.protocol.ProtocolHandler; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -303,4 +318,61 @@ public void testCreateTopics() throws InterruptedException { assertTrue(e.getMessage().contains("Not supported by kop server.")); } } + + @Test(timeOut = 10000) + public void testProduceCallback() throws Exception { + final String topic = "test-produce-callback"; + final int numMessages = 10; + final String messagePrefix = "msg-"; + + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + @Cleanup + KafkaProducer producer = new KafkaProducer<>(props); + + Map indexToOffset = new ConcurrentHashMap<>(); + for (int i = 0; i < numMessages; i++) { + final int index = i; + producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), (recordMetadata, e) -> { + if (e != null) { + log.error("Failed to send {}: {}", index, e); + fail("Failed to send " + index + ": " + e.getMessage()); + } + assertEquals(recordMetadata.topic(), topic); + assertEquals(recordMetadata.partition(), 0); + indexToOffset.put(index, recordMetadata.offset()); + MessageIdImpl id = (MessageIdImpl) MessageIdUtils.getMessageId(indexToOffset.get(index)); + log.info("Success write {} to {} ({}, {})", index, recordMetadata.offset(), + id.getLedgerId(), id.getEntryId()); + }).get(); + // TODO: here we disable batching for Kafka producer, because when batching is enabled, Pulsar consumers + // may receive wrong messages order from Kafka producer. This issue may be similar to + // https://github.com/streamnative/kop/issues/243 + } + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("subscription-name") + .subscribe(); + for (int i = 0; i < numMessages; i++) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + assertNotNull(message); + consumer.acknowledge(message); + assertTrue(indexToOffset.containsKey(i)); + + MessageIdImpl id = (MessageIdImpl) MessageIdUtils.getMessageId(indexToOffset.get(i)); + byte[] positionInSendResponse = id.toByteArray(); + byte[] positionReceived = message.getMessageId().toByteArray(); + log.info("Successfully send {} to ({}, {}) {}, received: {}", i, id.getLedgerId(), id.getEntryId(), + positionInSendResponse, positionReceived); + // The result of MessageIdUtils#getMessageId only contains ledger id and entry id, so we need to cut the + // extra bytes of positionInSendResponse. + assertEquals(positionInSendResponse, Arrays.copyOf(positionReceived, positionInSendResponse.length)); + } + } }