From 5cd99dc02371d19bb6e02fee688b70ded18294ed Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 26 Jul 2021 20:29:51 +0800 Subject: [PATCH 1/4] Optimize the protocol handler start --- .../handlers/kop/KafkaProtocolHandler.java | 126 +++++------------- 1 file changed, 33 insertions(+), 93 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 592f68ff10..76e4a36c36 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -51,12 +51,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.protocol.ProtocolHandler; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.client.admin.Lookup; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -78,6 +79,15 @@ public class KafkaProtocolHandler implements ProtocolHandler { private KopBrokerLookupManager kopBrokerLookupManager; private AdminManager adminManager = null; + @Getter + private KafkaServiceConfiguration kafkaConfig; + @Getter + private BrokerService brokerService; + @Getter + private GroupCoordinator groupCoordinator; + @Getter + private TransactionCoordinator transactionCoordinator; + /** * Listener for the changing of topic that stores offsets of consumer group. */ @@ -191,24 +201,7 @@ public boolean test(NamespaceBundle namespaceBundle) { } } - /** - * Kafka Listener Type. - */ - public enum ListenerType { - PLAINTEXT, - SSL - } - @Getter - private KafkaServiceConfiguration kafkaConfig; - @Getter - private BrokerService brokerService; - @Getter - private GroupCoordinator groupCoordinator; - @Getter - private TransactionCoordinator transactionCoordinator; - @Getter - private String bindAddress; @Override public String protocolName() { @@ -235,7 +228,6 @@ public void initialize(ServiceConfiguration conf) throws Exception { kafkaConfig.setAdvertisedAddress(conf.getAdvertisedAddress()); kafkaConfig.setBindAddress(conf.getBindAddress()); } - this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(kafkaConfig.getBindAddress()); KopTopic.initialize(kafkaConfig.getKafkaTenant() + "/" + kafkaConfig.getKafkaNamespace()); statsProvider = new PrometheusMetricsProvider(); @@ -261,21 +253,17 @@ public void start(BrokerService service) { KopVersion.getBuildHost(), KopVersion.getBuildTime()); - try { - adminManager = new AdminManager(brokerService.getPulsar().getAdminClient(), kafkaConfig); - } catch (PulsarServerException e) { - log.error("Failed to create PulsarAdmin: {}", e.getMessage()); - throw new IllegalStateException(e); - } - ZooKeeperUtils.tryCreatePath(brokerService.pulsar().getZkClient(), kafkaConfig.getGroupIdZooKeeperPath(), new byte[0]); PulsarAdmin pulsarAdmin; + PulsarClient pulsarClient; try { pulsarAdmin = brokerService.getPulsar().getAdminClient(); + adminManager = new AdminManager(pulsarAdmin, kafkaConfig); + pulsarClient = brokerService.getPulsar().getClient(); } catch (PulsarServerException e) { - log.error("init PulsarAdmin failed with ", e); + log.error("Failed to get pulsarAdmin or pulsarClient", e); throw new IllegalStateException(e); } final ClusterData clusterData = ClusterData.builder() @@ -285,24 +273,25 @@ public void start(BrokerService service) { .brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls()) .build(); - // init and start group coordinator try { - initGroupCoordinator(pulsarAdmin, clusterData); - startGroupCoordinator(); - // and listener for Offset topics load/unload - brokerService.pulsar() - .getNamespaceService() - .addNamespaceBundleOwnershipListener( - new OffsetAndTopicListener(brokerService, kafkaConfig, groupCoordinator)); - } catch (Exception e) { - log.error("initGroupCoordinator failed with", e); + MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig); + } catch (PulsarAdminException e) { + log.error("Failed to create offset metadata", e); throw new IllegalStateException(e); } + // init and start group coordinator + startGroupCoordinator(pulsarClient); + // and listener for Offset topics load/unload + brokerService.pulsar() + .getNamespaceService() + .addNamespaceBundleOwnershipListener( + new OffsetAndTopicListener(brokerService, kafkaConfig, groupCoordinator)); + // init kafka namespaces try { - initKafkaNamespace(pulsarAdmin, clusterData); - } catch (Exception e) { + MetadataUtils.createKafkaNamespaceIfMissing(pulsarAdmin, clusterData, kafkaConfig); + } catch (PulsarAdminException e) { // no need to throw exception since we can create kafka namespace later log.warn("init kafka failed, need to create it manually later", e); } @@ -368,9 +357,7 @@ public Map> newChannelIniti @Override public void close() { adminManager.shutdown(); - if (groupCoordinator != null) { - groupCoordinator.shutdown(); - } + groupCoordinator.shutdown(); KafkaTopicManager.LOOKUP_CACHE.clear(); KopBrokerLookupManager.clear(); KafkaTopicManager.closeKafkaTopicConsumerManagers(); @@ -379,11 +366,7 @@ public void close() { statsProvider.stop(); } - public void initKafkaNamespace(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception { - MetadataUtils.createKafkaNamespaceIfMissing(pulsarAdmin, clusterData, kafkaConfig); - } - - public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception { + public void startGroupCoordinator(PulsarClient pulsarClient) { GroupConfig groupConfig = new GroupConfig( kafkaConfig.getGroupMinSessionTimeoutMs(), kafkaConfig.getGroupMaxSessionTimeoutMs(), @@ -401,10 +384,8 @@ public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterDat .offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes())) .build(); - MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig); - this.groupCoordinator = GroupCoordinator.of( - (PulsarClientImpl) (brokerService.pulsar().getClient()), + (PulsarClientImpl) pulsarClient, groupConfig, offsetConfig, SystemTimer.builder() @@ -412,16 +393,8 @@ public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterDat .build(), Time.SYSTEM ); - - loadOffsetTopics(groupCoordinator); - } - - public void startGroupCoordinator() throws Exception { - if (this.groupCoordinator != null) { - this.groupCoordinator.startup(true); - } else { - log.error("Failed to start group coordinator. Need init it first."); - } + // always enable metadata expiration + this.groupCoordinator.startup(true); } public void initTransactionCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception { @@ -449,39 +422,6 @@ public void startTransactionCoordinator() throws Exception { } } - /** - * This method discovers ownership of offset topic partitions and attempts to load offset topics - * assigned to this broker. - */ - private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception { - Lookup lookupService = brokerService.pulsar().getAdminClient().lookups(); - String currentBroker = brokerService.pulsar().getBrokerServiceUrl(); - String topicBase = MetadataUtils.constructOffsetsTopicBaseName(kafkaConfig); - int numPartitions = kafkaConfig.getOffsetsTopicNumPartitions(); - - Map> mapBrokerToPartition = new HashMap<>(); - - for (int i = 0; i < numPartitions; i++) { - String broker = lookupService.lookupTopic(topicBase + PARTITIONED_TOPIC_SUFFIX + i); - mapBrokerToPartition.putIfAbsent(broker, new ArrayList()); - mapBrokerToPartition.get(broker).add(i); - } - - mapBrokerToPartition.entrySet().stream().forEach( - e -> log.info("Discovered broker: {} owns offset topic partitions: {} ", e.getKey(), e.getValue())); - - List partitionsOwnedByCurrentBroker = mapBrokerToPartition.get(currentBroker); - - if (null != partitionsOwnedByCurrentBroker && !partitionsOwnedByCurrentBroker.isEmpty()) { - List> lists = partitionsOwnedByCurrentBroker.stream().map( - (ii) -> groupCoordinator.handleGroupImmigration(ii)).collect(Collectors.toList()); - - FutureUtil.waitForAll(lists).get(); - } else { - log.info("Current broker: {} does not own any of the offset topic partitions", currentBroker); - } - } - /** * This method discovers ownership of offset topic partitions and attempts to load offset topics * assigned to this broker. From 3278fbdffa516b4a67d59dec8bb779087b5ab7a2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Jul 2021 12:32:15 +0800 Subject: [PATCH 2/4] Fix tests error by mocking compactor --- .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 7edc615f09..692ce42b5d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -332,9 +332,6 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception setupBrokerMocks(pulsar); pulsar.start(); - Compactor spiedCompactor = spy(pulsar.getCompactor()); - doReturn(spiedCompactor).when(pulsar).getCompactor(); - return pulsar; } From e4d9b73ac4b1eb620ec47aa5b57a5115da1aea42 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Jul 2021 13:12:36 +0800 Subject: [PATCH 3/4] Fix checkstyle --- .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 692ce42b5d..3dd8ca57e8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -71,7 +71,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; From 090d8311eec43bf8e25084e2b3dce05d117c1389 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Jul 2021 15:01:48 +0800 Subject: [PATCH 4/4] Print metrics string for debugging --- .../streamnative/pulsar/handlers/kop/MetricsProviderTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java index fe787aa2ca..453fbcc5eb 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java @@ -152,6 +152,8 @@ public void testMetricsProvider() throws Exception { sb.append(line); } + log.info("Metrics string:\n{}", sb.toString()); + // channel stats Assert.assertTrue(sb.toString().contains("kop_server_ALIVE_CHANNEL_COUNT")); Assert.assertTrue(sb.toString().contains("kop_server_ACTIVE_CHANNEL_COUNT"));