From 72b3c4f60e44854c04c511fd19749553d50058fa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 10 May 2021 20:49:49 +0800 Subject: [PATCH 1/3] Fix LoadManager interface --- .../handlers/kop/KafkaRequestHandler.java | 16 +++++++------- .../handlers/kop/KopBrokerLookupManager.java | 17 +++++++------- pom.xml | 22 ++++++++++++------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 584236f14c..21e06e5c82 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -157,9 +157,10 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Murmur3_32Hash; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; /** * This class contains all the request handling methods. @@ -1730,12 +1731,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } // Get a list of ServiceLookupData for each matchBroker. - List>> list = matchBrokers.stream() - .map(matchBroker -> - zkCache.getDataAsync( - String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker), - (Deserializer) - pulsarService.getLoadManager().get().getLoadReportDeserializer())) + final MetadataCache metadataCache = pulsarService.getLocalMetadataStore() + .getMetadataCache(LocalBrokerData.class); + List>> list = matchBrokers.stream() + .map(matchBroker -> metadataCache.get( + String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker))) .collect(Collectors.toList()); FutureUtil.waitForAll(list) @@ -1748,7 +1748,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } try { - for (CompletableFuture> lookupData : list) { + for (CompletableFuture> lookupData : list) { ServiceLookupData data = lookupData.get().get(); if (log.isDebugEnabled()) { log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, " diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java index 2caa3a9808..c86d736599 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java @@ -31,6 +31,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.apache.pulsar.zookeeper.ZooKeeperCache; @@ -211,12 +213,11 @@ private CompletableFuture> getProtocolDataToAdvertise( } // Get a list of ServiceLookupData for each matchBroker. - List>> list = matchBrokers.stream() - .map(matchBroker -> - zkCache.getDataAsync( - String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker), - (ZooKeeperCache.Deserializer) - pulsarService.getLoadManager().get().getLoadReportDeserializer())) + final MetadataCache metadataCache = pulsarService.getLocalMetadataStore() + .getMetadataCache(LocalBrokerData.class); + List>> list = matchBrokers.stream() + .map(matchBroker -> metadataCache.get( + String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker))) .collect(Collectors.toList()); getKopAddress(list, pulsarAddress, kopAddressFuture, topic, hostAndPort); @@ -224,7 +225,7 @@ private CompletableFuture> getProtocolDataToAdvertise( return kopAddressFuture; } - private void getKopAddress(List>> list, + private void getKopAddress(List>> list, InetSocketAddress pulsarAddress, CompletableFuture> kopAddressFuture, TopicName topic, @@ -239,7 +240,7 @@ private void getKopAddress(List>> } try { - for (CompletableFuture> lookupData : list) { + for (CompletableFuture> lookupData : list) { ServiceLookupData data = lookupData.get().get(); if (log.isDebugEnabled()) { log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, " diff --git a/pom.xml b/pom.xml index 4893345338..8b7d13cf56 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,8 @@ 2.13.3 1.18.4 2.22.0 - 2.8.0-rc-202104202206 + io.streamnative + 2.8.0-rc-202105092228 1.7.25 3.1.8 1.15.1 @@ -105,35 +106,35 @@ - org.apache.pulsar + ${pulsar.group.id} pulsar-broker ${pulsar.version} provided - org.apache.pulsar + ${pulsar.group.id} pulsar-broker-common ${pulsar.version} provided - org.apache.pulsar + ${pulsar.group.id} pulsar-client-original ${pulsar.version} provided - org.apache.pulsar + ${pulsar.group.id} pulsar-client-admin-original ${pulsar.version} provided - org.apache.pulsar + ${pulsar.group.id} testmocks ${pulsar.version} provided @@ -214,7 +215,7 @@ - org.apache.pulsar + ${pulsar.group.id} pulsar-broker ${pulsar.version} test-jar @@ -222,7 +223,7 @@ - org.apache.pulsar + ${pulsar.group.id} managed-ledger ${pulsar.version} test-jar @@ -399,6 +400,11 @@ + + sonatype-streamnative-maven + sonatype + https://oss.sonatype.org/content/repositories/iostreamnative-1129 + bintray-streamnative-maven bintray From 00e9235a0d828cef2b5ce0d2bcb2eb679d5ee852 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 11 May 2021 00:58:16 +0800 Subject: [PATCH 2/3] Refactor publish throttling --- .../handlers/kop/InternalServerCnx.java | 47 +------ .../handlers/kop/KafkaRequestHandler.java | 66 +++++++++- pom.xml | 10 +- .../MessagePublishBufferThrottleTestBase.java | 120 ++++++++---------- 4 files changed, 129 insertions(+), 114 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java index 8432d644f5..9233a961a1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java @@ -13,9 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop; -import com.google.common.annotations.VisibleForTesting; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.Producer; @@ -32,10 +30,6 @@ public class InternalServerCnx extends ServerCnx { @Getter KafkaRequestHandler kafkaRequestHandler; - private static final AtomicLongFieldUpdater KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER = - AtomicLongFieldUpdater.newUpdater(InternalServerCnx.class, "kopMessagePublishBufferSize"); - private volatile long kopMessagePublishBufferSize = 0; - public InternalServerCnx(KafkaRequestHandler kafkaRequestHandler) { super(kafkaRequestHandler.getPulsarService()); this.kafkaRequestHandler = kafkaRequestHandler; @@ -63,54 +57,21 @@ public void closeProducer(Producer producer) { // called after channel active public void updateCtx() { - this.remoteAddress = kafkaRequestHandler.getRemoteAddress(); + this.remoteAddress = kafkaRequestHandler.remoteAddress; } @Override public void enableCnxAutoRead() { - if (!kafkaRequestHandler.ctx.channel().config().isAutoRead()) { - kafkaRequestHandler.ctx.channel().config().setAutoRead(true); - kafkaRequestHandler.ctx.read(); - if (log.isDebugEnabled()) { - log.debug("Channel {} auto read has set to true.", kafkaRequestHandler.ctx.channel()); - } - } + // do nothing is this mock } @Override public void disableCnxAutoRead() { - if (kafkaRequestHandler.ctx.channel().config().isAutoRead()) { - kafkaRequestHandler.ctx.channel().config().setAutoRead(false); - if (log.isDebugEnabled()) { - log.debug("Channel {} auto read has set to false.", kafkaRequestHandler.ctx.channel()); - } - } - } - - public void increasePublishBuffer(long msgSize) { - KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize); - if (getBrokerService().isReachMessagePublishBufferThreshold()) { - disableCnxAutoRead(); - } - } - - public void decreasePublishBuffer(long msgSize) { - KOP_MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize); + // do nothing is this mock } @Override - public long getMessagePublishBufferSize() { - return kopMessagePublishBufferSize; - } - - public void cancelPublishBufferLimiting() { - // do nothing. + // do nothing is this mock } - - @VisibleForTesting - public void setMessagePublishBufferSize(long size) { - this.kopMessagePublishBufferSize = size; - } - } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 21e06e5c82..b31176f37a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -59,6 +59,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -197,6 +198,12 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { // is found. private final Map pendingTopicFuturesMap = new ConcurrentHashMap<>(); + // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel. + private final long maxPendingBytes; + private final long resumeThresholdPendingBytes; + private final AtomicLong pendingBytes = new AtomicLong(0); + private volatile boolean autoReadDisabledPublishBufferLimiting = false; + public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, GroupCoordinator groupCoordinator, @@ -227,6 +234,8 @@ public KafkaRequestHandler(PulsarService pulsarService, this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); this.currentConnectedGroup = new ConcurrentHashMap<>(); this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath(); + this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L; + this.resumeThresholdPendingBytes = this.maxPendingBytes / 2; } @Override @@ -245,7 +254,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channel inactive {}", ctx.channel()); close(); - isActive.set(false); } @Override @@ -628,6 +636,54 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar, }); } + private void disableCnxAutoRead() { + if (ctx != null && ctx.channel().config().isAutoRead()) { + ctx.channel().config().setAutoRead(false); + if (log.isDebugEnabled()) { + log.debug("[{}] disable auto read", ctx.channel()); + } + } + } + + private void enableCnxAutoRead() { + if (ctx != null && !ctx.channel().config().isAutoRead() + && !autoReadDisabledPublishBufferLimiting) { + // Resume reading from socket if pending-request is not reached to threshold + ctx.channel().config().setAutoRead(true); + // triggers channel read + ctx.read(); + if (log.isDebugEnabled()) { + log.debug("[{}] enable auto read", ctx.channel()); + } + } + } + + private void startSendOperationForThrottling(long msgSize) { + final long currentPendingBytes = pendingBytes.addAndGet(msgSize); + if (currentPendingBytes >= maxPendingBytes && !autoReadDisabledPublishBufferLimiting && maxPendingBytes > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] disable auto read because currentPendingBytes({}) > maxPendingBytes({})", + ctx.channel(), currentPendingBytes, maxPendingBytes); + } + disableCnxAutoRead(); + autoReadDisabledPublishBufferLimiting = true; + pulsarService.getBrokerService().pausedConnections(1); + } + } + + private void completeSendOperationForThrottling(long msgSize) { + final long currentPendingBytes = pendingBytes.addAndGet(-msgSize); + if (currentPendingBytes < resumeThresholdPendingBytes && autoReadDisabledPublishBufferLimiting) { + if (log.isDebugEnabled()) { + log.debug("[{}] enable auto read because currentPendingBytes({}) < resumeThreshold({})", + ctx.channel(), currentPendingBytes, resumeThresholdPendingBytes); + } + autoReadDisabledPublishBufferLimiting = false; + enableCnxAutoRead(); + pulsarService.getBrokerService().resumedConnections(1); + } + } + private void publishMessages(final PersistentTopic persistentTopic, final ByteBuf byteBuf, final int numMessages, @@ -655,9 +711,10 @@ private void publishMessages(final PersistentTopic persistentTopic, final long beforePublish = MathUtils.nowInNano(); persistentTopic.publishMessage(byteBuf, MessagePublishContext.get(offsetFuture, persistentTopic, numMessages, System.nanoTime())); - byteBuf.release(); final RecordBatch batch = records.batchIterator().next(); offsetFuture.whenComplete((offset, e) -> { + completeSendOperationForThrottling(byteBuf.readableBytes()); + byteBuf.release(); if (e == null) { if (batch.isTransactional()) { transactionCoordinator.addActivePidOffset(TopicName.get(partitionName), batch.producerId(), offset); @@ -683,9 +740,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, final int numPartitions = produceRequest.partitionRecordsOrFail().size(); - final long dataSizePerPartition = produceHar.getBuffer().readableBytes(); - topicManager.getInternalServerCnx().increasePublishBuffer(dataSizePerPartition); - final Map responseMap = new ConcurrentHashMap<>(); final CompletableFuture produceFuture = new CompletableFuture<>(); BiConsumer addPartitionResponse = (topicPartition, response) -> { @@ -719,6 +773,7 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages); requestStats.getProduceEncodeStats().registerSuccessfulEvent( MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS); + startSendOperationForThrottling(byteBuf.readableBytes()); if (log.isDebugEnabled()) { log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ", @@ -757,7 +812,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, }); produceFuture.thenApply(ignored -> { - topicManager.getInternalServerCnx().decreasePublishBuffer(dataSizePerPartition); if (log.isDebugEnabled()) { log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString()); } diff --git a/pom.xml b/pom.xml index 8b7d13cf56..474e4fc577 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ true - 3.4 + 3.11 21.0 1.18.0 2.12.1 @@ -53,6 +53,8 @@ 3.1.8 1.15.1 6.14.3 + 1.10.2 + 4.0.3 1.4.9 3.0.rc1 @@ -237,6 +239,12 @@ provided + + org.awaitility + awaitility + ${awaitility.version} + test + diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTestBase.java index 8dd8a438c4..453f707cdf 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTestBase.java @@ -15,11 +15,13 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.pulsar.broker.service.Topic; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.Test; @@ -27,6 +29,7 @@ * Test class for message publish buffer throttle from kop side. * */ +@Slf4j public abstract class MessagePublishBufferThrottleTestBase extends KopProtocolHandlerTestBase{ public MessagePublishBufferThrottleTestBase(final String entryFormat) { @@ -36,38 +39,35 @@ public MessagePublishBufferThrottleTestBase(final String entryFormat) { @Test public void testMessagePublishBufferThrottleDisabled() throws Exception { conf.setMaxMessagePublishBufferSizeInMB(-1); - conf.setMessagePublishBufferCheckIntervalInMillis(10); super.internalSetup(); final String topic = "testMessagePublishBufferThrottleDisabled"; - final String pulsarTopic = "persistent://public/default/" + topic + "-partition-0"; Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put("delivery.timeout.ms", 300000); - KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer(properties); - - // send one record to make sure InternalProducer build on broker side - producer.send(new ProducerRecord<>(topic, "test".getBytes())).get(); - - Topic topicRef = pulsar.getBrokerService().getTopicReference(pulsarTopic).get(); - Assert.assertNotNull(topicRef); - InternalServerCnx internalServerCnx = (InternalServerCnx) - ((InternalProducer) topicRef.getProducers().values().toArray()[0]).getCnx(); - internalServerCnx.setMessagePublishBufferSize(Long.MAX_VALUE / 2); - // sleep to make sure the publish buffer check task has been executed - Thread.sleep(conf.getMessagePublishBufferCheckIntervalInMillis() * 2); - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - // Make sure the producer can publish succeed. - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord<>(topic, new byte[1024])).get(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0); + final KafkaProducer producer = new KafkaProducer<>(properties); + + mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS); + + final byte[] payload = new byte[1024 * 256]; + final int numMessages = 50; + final AtomicInteger numSend = new AtomicInteger(0); + for (int i = 0; i < numMessages; i++) { + final int index = i; + producer.send(new ProducerRecord<>(topic, payload), (metadata, exception) -> { + if (exception != null) { + log.error("Failed to send {}: {}", index, exception.getMessage()); + return; + } + numSend.getAndIncrement(); + }); } - // sleep to make sure the publish buffer check task has been executed - Thread.sleep(conf.getMessagePublishBufferCheckIntervalInMillis() * 2); - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); + + Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(numSend.get(), numMessages)); + producer.close(); super.internalCleanup(); } @@ -75,46 +75,38 @@ public void testMessagePublishBufferThrottleDisabled() throws Exception { public void testMessagePublishBufferThrottleEnable() throws Exception { // set size for max publish buffer before broker start conf.setMaxMessagePublishBufferSizeInMB(1); - conf.setMessagePublishBufferCheckIntervalInMillis(100); super.internalSetup(); - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); final String topic = "testMessagePublishBufferThrottleEnable"; - final String pulsarTopic = "persistent://public/default/" + topic + "-partition-0"; Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put("delivery.timeout.ms", 300000); - KafkaProducer producer = new org.apache.kafka.clients.producer.KafkaProducer(properties); - - // send one record to make sure InternalProducer build on broker side - producer.send(new ProducerRecord<>(topic, "test".getBytes())).get(); - - Topic topicRef = pulsar.getBrokerService().getTopicReference(pulsarTopic).get(); - Assert.assertNotNull(topicRef); - InternalServerCnx internalServerCnx = (InternalServerCnx) - ((InternalProducer) topicRef.getProducers().values().toArray()[0]).getCnx(); - internalServerCnx.setMessagePublishBufferSize(Long.MAX_VALUE / 2); - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - // The first message can publish success, but the second message should be blocked - producer.send(new ProducerRecord<>(topic, new byte[1024])).get(1, TimeUnit.SECONDS); - // sleep to make sure the publish buffer check task has been executed - Thread.sleep(conf.getMessagePublishBufferCheckIntervalInMillis() * 2); - Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - - internalServerCnx.setMessagePublishBufferSize(0); - // sleep to make sure the publish buffer check task has been executed - Thread.sleep(conf.getMessagePublishBufferCheckIntervalInMillis() * 2); - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - // Make sure the producer can publish succeed. - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord<>(topic, new byte[1024])).get(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaBrokerPort); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0); + final KafkaProducer producer = new KafkaProducer<>(properties); + + mockBookKeeper.addEntryDelay(1, TimeUnit.SECONDS); + + final byte[] payload = new byte[1024 * 256]; + final int numMessages = 50; + final AtomicInteger numSend = new AtomicInteger(0); + for (int i = 0; i < numMessages; i++) { + final int index = i; + producer.send(new ProducerRecord<>(topic, payload), (metadata, exception) -> { + if (exception != null) { + log.error("Failed to send {}: {}", index, exception.getMessage()); + return; + } + numSend.getAndIncrement(); + }); } - Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - Assert.assertEquals(internalServerCnx.getMessagePublishBufferSize(), 0); + + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 1L)); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(numSend.get(), numMessages)); + Awaitility.await().untilAsserted( + () -> Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0L)); + producer.close(); super.internalCleanup(); } From bc40f2c1b310c4c1f1fccdaf522a6808bbdb888c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 11 May 2021 01:14:53 +0800 Subject: [PATCH 3/3] Remove ZookeeperCache usage --- .../pulsar/handlers/kop/KafkaRequestHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index b31176f37a..aadf3c18bd 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -161,7 +161,6 @@ import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; -import org.apache.pulsar.zookeeper.ZooKeeperCache; /** * This class contains all the request handling methods. @@ -1759,8 +1758,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } // advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort // here we get the broker url, need to find related webServiceUrl. - ZooKeeperCache zkCache = pulsarService.getLocalZkCache(); - zkCache.getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT, zkCache) + pulsarService.getPulsarResources() + .getDynamicConfigResources() + .getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT) .whenComplete((set, throwable) -> { if (throwable != null) { log.error("Error in getChildrenAsync(zk://loadbalance) for {}", pulsarAddress, throwable);