From 38e225bc9a4dd3a77512cbc301e20944c0a32958 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 22 Dec 2016 16:16:54 -0800 Subject: [PATCH 1/2] Fix: Fair message consumption from all partitions in partitioned-consumer --- .../api/PartitionedProducerConsumerTest.java | 64 +++++++++++++++++++ .../api/SimpleProducerConsumerTest.java | 2 +- .../pulsar/client/impl/ConsumerImpl.java | 15 ++++- .../client/impl/PartitionedConsumerImpl.java | 14 ++-- 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java index 35574a61745de..970e9fc37e61c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -450,6 +450,70 @@ public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception log.info("-- Exiting {} test --", methodName); } + /** + * It verifies that consumer consumes from all the partitions fairly. + * + * @throws Exception + */ + @Test + public void testFairDistributionForPartitionConsumers() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int numPartitions = 2; + final String topicName = "persistent://my-property/use/my-ns/my-topic"; + final String producer1Msg = "producer1"; + final String producer2Msg = "producer2"; + final int queueSize = 10; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(queueSize); + + admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + Producer producer1 = pulsarClient.createProducer(topicName + "-partition-0", producerConf); + Producer producer2 = pulsarClient.createProducer(topicName + "-partition-1", producerConf); + + Consumer consumer = pulsarClient.subscribe(topicName, "my-partitioned-subscriber", conf); + + int partition2Msgs = 0; + + // produce messages on Partition-1: which will makes partitioned-consumer's queue full + for (int i = 0; i < queueSize - 1; i++) { + producer1.send((producer1Msg + "-" + i).getBytes()); + } + + Thread.sleep(1000); + + // now queue is full : so, partition-2 consumer will be pushed to paused-consumer list + for (int i = 0; i < 5; i++) { + producer2.send((producer2Msg + "-" + i).getBytes()); + } + + // now, Queue should take both partition's messages + // also: we will keep producing messages to partition-1 + int produceMsgInPartition1AfterNumberOfConsumeMessages = 2; + for (int i = 0; i < 3 * queueSize; i++) { + Message msg = consumer.receive(); + partition2Msgs += (new String(msg.getData())).startsWith(producer2Msg) ? 1 : 0; + if (i >= produceMsgInPartition1AfterNumberOfConsumeMessages) { + producer1.send(producer1Msg.getBytes()); + Thread.sleep(100); + } + + } + + assertTrue(partition2Msgs >= 4); + producer1.close(); + producer2.close(); + consumer.unsubscribe(); + consumer.close(); + admin.persistentTopics().deletePartitionedTopic(topicName); + + log.info("-- Exiting {} test --", methodName); + } + private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch, final Set consumeMsg, ExecutorService executor) throws PulsarClientException { if (currentMessage < totalMessage) { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 53b8def7201f2..04264a24fd887 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1690,7 +1690,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() * * @throws Exception */ - @Test(invocationCount=10) + @Test public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index cab1f711247e6..a0a049593ae34 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -427,6 +427,10 @@ public void operationComplete(Future future) throws Exception { } else if (ackType == AckType.Cumulative) { stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId)); } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Successfully acknowledged message - {}, acktype {}", subscription, + consumerName, messageId, ackType); + } ackFuture.complete(null); } else { stats.incrementNumAcksFailed(); @@ -588,7 +592,8 @@ public CompletableFuture closeAsync() { void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message: {}", topic, subscription, messageId); + log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), + messageId.getEntryId()); } MessageMetadata msgMetadata = null; @@ -905,6 +910,10 @@ public void redeliverUnacknowledgedMessages() { if (currentSize > 0) { sendFlowPermitsToBroker(cnx, currentSize); } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Redeliver unacked messages and send {} permits", subscription, consumerName, + currentSize); + } return; } if (cnx == null || (state.get() == State.Connecting)) { @@ -945,6 +954,10 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { increaseAvailablePermits(cnx, messagesFromQueue); } builder.recycle(); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Redeliver unacked messages and increase {} permits", subscription, consumerName, + messagesFromQueue); + } return; } if (cnx == null || (state.get() == State.Connecting)) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index 5661b7871f63f..44b17b073f0d7 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -52,10 +52,6 @@ public class PartitionedConsumerImpl extends ConsumerBase { // shared incoming queue was full private final ConcurrentLinkedQueue pausedConsumers; - // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to - // resume receiving from the paused consumer partitions - private final int sharedQueueResumeThreshold; - private final int numPartitions; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStats stats; @@ -66,7 +62,6 @@ public class PartitionedConsumerImpl extends ConsumerBase { subscribeFuture); this.consumers = Lists.newArrayListWithCapacity(numPartitions); this.pausedConsumers = new ConcurrentLinkedQueue<>(); - this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; this.numPartitions = numPartitions; stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null; @@ -142,7 +137,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { } private void resumeReceivingFromPausedConsumersIfNeeded() { - if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { + if (incomingMessages.size() < maxReceiverQueueSize && !pausedConsumers.isEmpty()) { while (true) { ConsumerImpl consumer = pausedConsumers.poll(); if (consumer == null) { @@ -327,6 +322,9 @@ void connectionOpened(ClientCnx cnx) { void messageReceived(Message message) { lock.readLock().lock(); try { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Received message from partitioned-consumer {}", topic, subscription, message.getMessageId()); + } // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue if (!pendingReceives.isEmpty()) { CompletableFuture receivedFuture = pendingReceives.poll(); @@ -355,7 +353,9 @@ void messageReceived(Message message) { } try { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message); + } listener.received(PartitionedConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message, From f3517384ddf8c5e4f1ff45c711c92be41aeb4e0a Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 23 Dec 2016 17:12:22 -0800 Subject: [PATCH 2/2] pause consumer if other consumers are waiting on pausedConsumer list --- .../yahoo/pulsar/client/impl/ConsumerImpl.java | 16 ++++++++-------- .../client/impl/PartitionedConsumerImpl.java | 15 +++++++++++---- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index a0a049593ae34..ded3d7b5251d5 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -428,8 +428,8 @@ public void operationComplete(Future future) throws Exception { stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId)); } if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Successfully acknowledged message - {}, acktype {}", subscription, - consumerName, messageId, ackType); + log.debug("[{}] [{}] [{}] Successfully acknowledged message - {}, acktype {}", subscription, + topic, consumerName, messageId, ackType); } ackFuture.complete(null); } else { @@ -680,12 +680,12 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC } try { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg); + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg.getMessageId()); } listener.received(ConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, - msg, t); + msg.getMessageId(), t); } } catch (PulsarClientException e) { @@ -911,8 +911,8 @@ public void redeliverUnacknowledgedMessages() { sendFlowPermitsToBroker(cnx, currentSize); } if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Redeliver unacked messages and send {} permits", subscription, consumerName, - currentSize); + log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, + consumerName, currentSize); } return; } @@ -955,8 +955,8 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } builder.recycle(); if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Redeliver unacked messages and increase {} permits", subscription, consumerName, - messagesFromQueue); + log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", subscription, topic, + consumerName, messagesFromQueue); } return; } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index 44b17b073f0d7..ec99501d9c1bb 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -52,6 +52,10 @@ public class PartitionedConsumerImpl extends ConsumerBase { // shared incoming queue was full private final ConcurrentLinkedQueue pausedConsumers; + // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to + // resume receiving from the paused consumer partitions + private final int sharedQueueResumeThreshold; + private final int numPartitions; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStats stats; @@ -62,6 +66,7 @@ public class PartitionedConsumerImpl extends ConsumerBase { subscribeFuture); this.consumers = Lists.newArrayListWithCapacity(numPartitions); this.pausedConsumers = new ConcurrentLinkedQueue<>(); + this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; this.numPartitions = numPartitions; stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null; @@ -123,8 +128,10 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // Process the message, add to the queue and trigger listener or async callback messageReceived(message); - if (incomingMessages.size() >= maxReceiverQueueSize) { - // No more space left in shared queue, mark this consumer to be resumed later + if (incomingMessages.size() >= maxReceiverQueueSize + || (incomingMessages.size() > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { + // mark this consumer to be resumed later: if No more space left in shared queue, + // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid @@ -137,7 +144,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { } private void resumeReceivingFromPausedConsumersIfNeeded() { - if (incomingMessages.size() < maxReceiverQueueSize && !pausedConsumers.isEmpty()) { + if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { while (true) { ConsumerImpl consumer = pausedConsumers.poll(); if (consumer == null) { @@ -354,7 +361,7 @@ void messageReceived(Message message) { try { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message); + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); } listener.received(PartitionedConsumerImpl.this, msg); } catch (Throwable t) {