diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java index 35574a61745de..970e9fc37e61c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -450,6 +450,70 @@ public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception log.info("-- Exiting {} test --", methodName); } + /** + * It verifies that consumer consumes from all the partitions fairly. + * + * @throws Exception + */ + @Test + public void testFairDistributionForPartitionConsumers() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int numPartitions = 2; + final String topicName = "persistent://my-property/use/my-ns/my-topic"; + final String producer1Msg = "producer1"; + final String producer2Msg = "producer2"; + final int queueSize = 10; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(queueSize); + + admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + Producer producer1 = pulsarClient.createProducer(topicName + "-partition-0", producerConf); + Producer producer2 = pulsarClient.createProducer(topicName + "-partition-1", producerConf); + + Consumer consumer = pulsarClient.subscribe(topicName, "my-partitioned-subscriber", conf); + + int partition2Msgs = 0; + + // produce messages on Partition-1: which will makes partitioned-consumer's queue full + for (int i = 0; i < queueSize - 1; i++) { + producer1.send((producer1Msg + "-" + i).getBytes()); + } + + Thread.sleep(1000); + + // now queue is full : so, partition-2 consumer will be pushed to paused-consumer list + for (int i = 0; i < 5; i++) { + producer2.send((producer2Msg + "-" + i).getBytes()); + } + + // now, Queue should take both partition's messages + // also: we will keep producing messages to partition-1 + int produceMsgInPartition1AfterNumberOfConsumeMessages = 2; + for (int i = 0; i < 3 * queueSize; i++) { + Message msg = consumer.receive(); + partition2Msgs += (new String(msg.getData())).startsWith(producer2Msg) ? 1 : 0; + if (i >= produceMsgInPartition1AfterNumberOfConsumeMessages) { + producer1.send(producer1Msg.getBytes()); + Thread.sleep(100); + } + + } + + assertTrue(partition2Msgs >= 4); + producer1.close(); + producer2.close(); + consumer.unsubscribe(); + consumer.close(); + admin.persistentTopics().deletePartitionedTopic(topicName); + + log.info("-- Exiting {} test --", methodName); + } + private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch, final Set consumeMsg, ExecutorService executor) throws PulsarClientException { if (currentMessage < totalMessage) { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 53b8def7201f2..04264a24fd887 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1690,7 +1690,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() * * @throws Exception */ - @Test(invocationCount=10) + @Test public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index cab1f711247e6..ded3d7b5251d5 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -427,6 +427,10 @@ public void operationComplete(Future future) throws Exception { } else if (ackType == AckType.Cumulative) { stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId)); } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Successfully acknowledged message - {}, acktype {}", subscription, + topic, consumerName, messageId, ackType); + } ackFuture.complete(null); } else { stats.incrementNumAcksFailed(); @@ -588,7 +592,8 @@ public CompletableFuture closeAsync() { void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message: {}", topic, subscription, messageId); + log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), + messageId.getEntryId()); } MessageMetadata msgMetadata = null; @@ -675,12 +680,12 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC } try { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg); + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg.getMessageId()); } listener.received(ConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, - msg, t); + msg.getMessageId(), t); } } catch (PulsarClientException e) { @@ -905,6 +910,10 @@ public void redeliverUnacknowledgedMessages() { if (currentSize > 0) { sendFlowPermitsToBroker(cnx, currentSize); } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, + consumerName, currentSize); + } return; } if (cnx == null || (state.get() == State.Connecting)) { @@ -945,6 +954,10 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { increaseAvailablePermits(cnx, messagesFromQueue); } builder.recycle(); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", subscription, topic, + consumerName, messagesFromQueue); + } return; } if (cnx == null || (state.get() == State.Connecting)) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index 5661b7871f63f..ec99501d9c1bb 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -128,8 +128,10 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // Process the message, add to the queue and trigger listener or async callback messageReceived(message); - if (incomingMessages.size() >= maxReceiverQueueSize) { - // No more space left in shared queue, mark this consumer to be resumed later + if (incomingMessages.size() >= maxReceiverQueueSize + || (incomingMessages.size() > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { + // mark this consumer to be resumed later: if No more space left in shared queue, + // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid @@ -327,6 +329,9 @@ void connectionOpened(ClientCnx cnx) { void messageReceived(Message message) { lock.readLock().lock(); try { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Received message from partitioned-consumer {}", topic, subscription, message.getMessageId()); + } // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue if (!pendingReceives.isEmpty()) { CompletableFuture receivedFuture = pendingReceives.poll(); @@ -355,7 +360,9 @@ void messageReceived(Message message) { } try { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); + } listener.received(PartitionedConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message,