Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.streamnative.pulsar.handlers.kop.utils;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand All @@ -24,8 +22,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,75 +32,6 @@
public class MessageIdUtils {
private static final Logger log = LoggerFactory.getLogger(MessageIdUtils.class);

// use 28 bits for ledgerId,
// 32 bits for entryId,
// 12 bits for batchIndex.
public static final int LEDGER_BITS = 20;
public static final int ENTRY_BITS = 32;
public static final int BATCH_BITS = 12;

public static final long getOffset(long ledgerId, long entryId) {
// Combine ledger id and entry id to form offset
checkArgument(ledgerId >= 0, "Expected ledgerId >= 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);

long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS));
return offset;
}

public static final long getOffset(long ledgerId, long entryId, int batchIndex) {
checkArgument(ledgerId >= 0, "Expected ledgerId >= 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);
checkArgument(batchIndex >= 0, "Expected batchIndex >= 0, but get " + batchIndex);
checkArgument(batchIndex < (1 << BATCH_BITS),
"Expected batchIndex only take " + BATCH_BITS + " bits, but it is " + batchIndex);

long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS)) + batchIndex;
return offset;
}

public static final MessageId getMessageId(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset > 0, "Expected Offset > 0, but get " + offset);

long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS);
long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;

return new MessageIdImpl(ledgerId, entryId, -1);
}

public static final PositionImpl getPosition(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS);
long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;

return new PositionImpl(ledgerId, entryId);
}

// get the batchIndex contained in offset.
public static final int getBatchIndex(long offset) {
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

return (int) (offset & 0x0F_FF);
}

// get next offset that after batch Index.
// In TopicConsumerManager, next read offset is updated after each entry reads,
// if it read a batched message previously, the next offset waiting read is next entry.
public static final long offsetAfterBatchIndex(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

int batchIndex = getBatchIndex(offset);
// this is a for
if (batchIndex != 0) {
return (offset - batchIndex) + (1 << BATCH_BITS);
}
return offset;
}

public static long getCurrentOffset(ManagedLedger managedLedger) {
return ((ManagedLedgerInterceptorImpl) managedLedger.getManagedLedgerInterceptor()).getIndex();
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -30,7 +29,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
Expand All @@ -40,7 +38,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,11 +69,6 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -231,15 +223,11 @@ public void testOffsetCommitWithInvalidPartition() throws Exception {
// Test ListOffset for earliest get the earliest message in topic.
// testReadUncommittedConsumerListOffsetEarliestOffsetEqualsHighWatermark
// testReadCommittedConsumerListOffsetEarliestOffsetEqualsLastStableOffset
@Ignore
@Test(timeOut = 20000)
public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws Exception {
String topicName = "testReadUncommittedConsumerListOffsetEarliest";
TopicPartition tp = new TopicPartition(topicName, 0);

// use producer to create some message to get Limit Offset.
String pulsarTopicName = "persistent://public/default/" + topicName;

// create partitioned topic.
admin.topics().createPartitionedTopic(topicName, 1);

Expand All @@ -262,20 +250,6 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E
log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr);
}

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(pulsarTopicName)
.subscriptionName(topicName + "_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNotNull(msg);
MessageIdImpl messageId = (MessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId();
// first entry should be the limit offset.
long limitOffset = MessageIdUtils.getOffset(messageId.getLedgerId(), 0);
log.info("After create {} messages, get messageId: {} expected earliest limit: {}",
totalMsgs, messageId, limitOffset);

// 2. real test, for ListOffset request verify Earliest get earliest
Map<TopicPartition, Long> targetTimes = Maps.newHashMap();
targetTimes.put(tp, ListOffsetRequest.EARLIEST_TIMESTAMP);
Expand All @@ -291,7 +265,7 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E
AbstractResponse response = responseFuture.get();
ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE);
assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset));
assertEquals(listOffsetResponse.responseData().get(tp).offset.intValue(), 0);
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(0));
}

Expand All @@ -300,15 +274,11 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E
// Test ListOffset for latest get the earliest message in topic.
// testReadUncommittedConsumerListOffsetLatest
// testReadCommittedConsumerListOffsetLatest
@Ignore
@Test(timeOut = 20000)
public void testConsumerListOffsetLatest() throws Exception {
String topicName = "testConsumerListOffsetLatest";
TopicPartition tp = new TopicPartition(topicName, 0);

// use producer to create some message to get Limit Offset.
String pulsarTopicName = "persistent://public/default/" + topicName;

// create partitioned topic.
admin.topics().createPartitionedTopic(topicName, 1);

Expand All @@ -331,20 +301,6 @@ public void testConsumerListOffsetLatest() throws Exception {
log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr);
}

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(pulsarTopicName)
.subscriptionName(topicName + "_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNotNull(msg);
MessageIdImpl messageId = (MessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId();
// LAC entry should be the limit offset.
long limitOffset = MessageIdUtils.getOffset(messageId.getLedgerId(), totalMsgs - 1);
log.info("After create {} messages, get messageId: {} expected latest limit: {}",
totalMsgs, messageId, limitOffset);

// 2. real test, for ListOffset request verify Earliest get earliest
Map<TopicPartition, Long> targetTimes = Maps.newHashMap();
targetTimes.put(tp, ListOffsetRequest.LATEST_TIMESTAMP);
Expand All @@ -361,7 +317,8 @@ public void testConsumerListOffsetLatest() throws Exception {
AbstractResponse response = responseFuture.get();
ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
assertEquals(listOffsetResponse.responseData().get(tp).error, Errors.NONE);
assertEquals(listOffsetResponse.responseData().get(tp).offset, Long.valueOf(limitOffset));
// TODO: this behavior is incorrect, the latest offset should be `totalMsgs`.
assertEquals(listOffsetResponse.responseData().get(tp).offset.intValue(), (totalMsgs - 1));
assertEquals(listOffsetResponse.responseData().get(tp).timestamp, Long.valueOf(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,11 @@

import com.google.common.collect.Sets;

import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;

import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,15 +38,15 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -123,7 +116,6 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Ignore
@Test(timeOut = 20000, dataProvider = "batchSizeList")
public void testKafkaProduceMessageOrder(int batchSize) throws Exception {
String topicName = "kopKafkaProducePulsarConsumeMessageOrder-" + batchSize;
Expand Down Expand Up @@ -151,22 +143,18 @@ public void testKafkaProduceMessageOrder(int batchSize) throws Exception {
int totalMsgs = 100;
String messageStrPrefix = "Message_Kop_KafkaProducePulsarConsumeOrder_";

Map<Long, Set<Long>> ledgerToEntrySet = new ConcurrentHashMap<>();
for (int i = 0; i < totalMsgs; i++) {
final int index = i;
producer.send(new ProducerRecord<>(topicName, i, messageStrPrefix + i), (recordMetadata, e) -> {
assertNull(e);
MessageIdImpl id = (MessageIdImpl) MessageIdUtils.getMessageId(recordMetadata.offset());
log.info("Success write message {} to {} ({}, {})", index, recordMetadata.offset(),
id.getLedgerId(), id.getEntryId());
ledgerToEntrySet.computeIfAbsent(id.getLedgerId(), key -> Collections.synchronizedSet(new HashSet<>()))
.add(id.getEntryId());
log.info("Success write message {} to offset {}", index, recordMetadata.offset());
});
}

// 2. Consume messages use Pulsar client Consumer.
if (conf.getEntryFormat().equals("pulsar")) {
Message<byte[]> msg = null;
int numBatches = 0;
for (int i = 0; i < totalMsgs; i++) {
msg = consumer.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
Expand All @@ -182,16 +170,21 @@ public void testKafkaProduceMessageOrder(int batchSize) throws Exception {
assertEquals(i, key.intValue());

consumer.acknowledge(msg);

BatchMessageIdImpl id =
(BatchMessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId();
if (id.getBatchIndex() == 0) {
numBatches++;
}
}

// verify have received all messages
msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);

final AtomicInteger numEntries = new AtomicInteger(0);
ledgerToEntrySet.forEach((ledgerId, entrySet) -> numEntries.set(numEntries.get() + entrySet.size()));
log.info("Successfully write {} entries of {} messages to bookie", numEntries.get(), totalMsgs);
assertTrue(numEntries.get() > 1 && numEntries.get() < totalMsgs);
// Check number of batches is in range (1, totalMsgs) to avoid each batch has only one message or all
// messages are batched into a single batch.
log.info("Successfully write {} batches of {} messages to bookie", numBatches, totalMsgs);
assertTrue(numBatches > 1 && numBatches < totalMsgs);
}

// 3. Consume messages use Kafka consumer.
Expand Down
Loading