Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -78,6 +79,15 @@ public class KafkaProtocolHandler implements ProtocolHandler {
private KopBrokerLookupManager kopBrokerLookupManager;
private AdminManager adminManager = null;

@Getter
private KafkaServiceConfiguration kafkaConfig;
@Getter
private BrokerService brokerService;
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private TransactionCoordinator transactionCoordinator;

/**
* Listener for the changing of topic that stores offsets of consumer group.
*/
Expand Down Expand Up @@ -191,24 +201,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
}

}
/**
* Kafka Listener Type.
*/
public enum ListenerType {
PLAINTEXT,
SSL
}

@Getter
private KafkaServiceConfiguration kafkaConfig;
@Getter
private BrokerService brokerService;
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private TransactionCoordinator transactionCoordinator;
@Getter
private String bindAddress;

@Override
public String protocolName() {
Expand All @@ -235,7 +228,6 @@ public void initialize(ServiceConfiguration conf) throws Exception {
kafkaConfig.setAdvertisedAddress(conf.getAdvertisedAddress());
kafkaConfig.setBindAddress(conf.getBindAddress());
}
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(kafkaConfig.getBindAddress());
KopTopic.initialize(kafkaConfig.getKafkaTenant() + "/" + kafkaConfig.getKafkaNamespace());

statsProvider = new PrometheusMetricsProvider();
Expand All @@ -261,21 +253,17 @@ public void start(BrokerService service) {
KopVersion.getBuildHost(),
KopVersion.getBuildTime());

try {
adminManager = new AdminManager(brokerService.getPulsar().getAdminClient(), kafkaConfig);
} catch (PulsarServerException e) {
log.error("Failed to create PulsarAdmin: {}", e.getMessage());
throw new IllegalStateException(e);
}

ZooKeeperUtils.tryCreatePath(brokerService.pulsar().getZkClient(),
kafkaConfig.getGroupIdZooKeeperPath(), new byte[0]);

PulsarAdmin pulsarAdmin;
PulsarClient pulsarClient;
try {
pulsarAdmin = brokerService.getPulsar().getAdminClient();
adminManager = new AdminManager(pulsarAdmin, kafkaConfig);
pulsarClient = brokerService.getPulsar().getClient();
} catch (PulsarServerException e) {
log.error("init PulsarAdmin failed with ", e);
log.error("Failed to get pulsarAdmin or pulsarClient", e);
throw new IllegalStateException(e);
}
final ClusterData clusterData = ClusterData.builder()
Expand All @@ -285,24 +273,25 @@ public void start(BrokerService service) {
.brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls())
.build();

// init and start group coordinator
try {
initGroupCoordinator(pulsarAdmin, clusterData);
startGroupCoordinator();
// and listener for Offset topics load/unload
brokerService.pulsar()
.getNamespaceService()
.addNamespaceBundleOwnershipListener(
new OffsetAndTopicListener(brokerService, kafkaConfig, groupCoordinator));
} catch (Exception e) {
log.error("initGroupCoordinator failed with", e);
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);
} catch (PulsarAdminException e) {
log.error("Failed to create offset metadata", e);
throw new IllegalStateException(e);
}

// init and start group coordinator
startGroupCoordinator(pulsarClient);
// and listener for Offset topics load/unload
brokerService.pulsar()
.getNamespaceService()
.addNamespaceBundleOwnershipListener(
new OffsetAndTopicListener(brokerService, kafkaConfig, groupCoordinator));

// init kafka namespaces
try {
initKafkaNamespace(pulsarAdmin, clusterData);
} catch (Exception e) {
MetadataUtils.createKafkaNamespaceIfMissing(pulsarAdmin, clusterData, kafkaConfig);
} catch (PulsarAdminException e) {
// no need to throw exception since we can create kafka namespace later
log.warn("init kafka failed, need to create it manually later", e);
}
Expand Down Expand Up @@ -368,9 +357,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
@Override
public void close() {
adminManager.shutdown();
if (groupCoordinator != null) {
groupCoordinator.shutdown();
}
groupCoordinator.shutdown();
KafkaTopicManager.LOOKUP_CACHE.clear();
KopBrokerLookupManager.clear();
KafkaTopicManager.closeKafkaTopicConsumerManagers();
Expand All @@ -379,11 +366,7 @@ public void close() {
statsProvider.stop();
}

public void initKafkaNamespace(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
MetadataUtils.createKafkaNamespaceIfMissing(pulsarAdmin, clusterData, kafkaConfig);
}

public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
public void startGroupCoordinator(PulsarClient pulsarClient) {
GroupConfig groupConfig = new GroupConfig(
kafkaConfig.getGroupMinSessionTimeoutMs(),
kafkaConfig.getGroupMaxSessionTimeoutMs(),
Expand All @@ -401,27 +384,17 @@ public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterDat
.offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
.build();

MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);

this.groupCoordinator = GroupCoordinator.of(
(PulsarClientImpl) (brokerService.pulsar().getClient()),
(PulsarClientImpl) pulsarClient,
groupConfig,
offsetConfig,
SystemTimer.builder()
.executorName("group-coordinator-timer")
.build(),
Time.SYSTEM
);

loadOffsetTopics(groupCoordinator);
}

public void startGroupCoordinator() throws Exception {
if (this.groupCoordinator != null) {
this.groupCoordinator.startup(true);
} else {
log.error("Failed to start group coordinator. Need init it first.");
}
// always enable metadata expiration
this.groupCoordinator.startup(true);
}

public void initTransactionCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
Expand Down Expand Up @@ -449,39 +422,6 @@ public void startTransactionCoordinator() throws Exception {
}
}

/**
* This method discovers ownership of offset topic partitions and attempts to load offset topics
* assigned to this broker.
*/
private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception {
Lookup lookupService = brokerService.pulsar().getAdminClient().lookups();
String currentBroker = brokerService.pulsar().getBrokerServiceUrl();
String topicBase = MetadataUtils.constructOffsetsTopicBaseName(kafkaConfig);
int numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();

Map<String, List<Integer>> mapBrokerToPartition = new HashMap<>();

for (int i = 0; i < numPartitions; i++) {
String broker = lookupService.lookupTopic(topicBase + PARTITIONED_TOPIC_SUFFIX + i);
mapBrokerToPartition.putIfAbsent(broker, new ArrayList());
mapBrokerToPartition.get(broker).add(i);
}

mapBrokerToPartition.entrySet().stream().forEach(
e -> log.info("Discovered broker: {} owns offset topic partitions: {} ", e.getKey(), e.getValue()));

List<Integer> partitionsOwnedByCurrentBroker = mapBrokerToPartition.get(currentBroker);

if (null != partitionsOwnedByCurrentBroker && !partitionsOwnedByCurrentBroker.isEmpty()) {
List<CompletableFuture<Void>> lists = partitionsOwnedByCurrentBroker.stream().map(
(ii) -> groupCoordinator.handleGroupImmigration(ii)).collect(Collectors.toList());

FutureUtil.waitForAll(lists).get();
} else {
log.info("Current broker: {} does not own any of the offset topic partitions", currentBroker);
}
}

/**
* This method discovers ownership of offset topic partitions and attempts to load offset topics
* assigned to this broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
Expand Down Expand Up @@ -332,9 +331,6 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception
setupBrokerMocks(pulsar);
pulsar.start();

Compactor spiedCompactor = spy(pulsar.getCompactor());
doReturn(spiedCompactor).when(pulsar).getCompactor();

return pulsar;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public void testMetricsProvider() throws Exception {
sb.append(line);
}

log.info("Metrics string:\n{}", sb.toString());

// channel stats
Assert.assertTrue(sb.toString().contains("kop_server_ALIVE_CHANNEL_COUNT"));
Assert.assertTrue(sb.toString().contains("kop_server_ACTIVE_CHANNEL_COUNT"));
Expand Down