diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java new file mode 100644 index 0000000000000..19a988373ef70 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Immutable topic metadata, representing the current state of a topic in the broker. + * + * @param id The topic ID. + * @param name The topic name. + * @param numPartitions The number of partitions. + * @param partitionRacks Map of every partition ID to a set of its rack IDs, if they exist. If rack information is unavailable for all + * partitions, this is an empty map. + */ +public record TopicMetadata(Uuid id, String name, int numPartitions, Map> partitionRacks) { + + public TopicMetadata(Uuid id, + String name, + int numPartitions, + Map> partitionRacks) { + this.id = Objects.requireNonNull(id); + if (Uuid.ZERO_UUID.equals(id)) { + throw new IllegalArgumentException("Topic id cannot be ZERO_UUID."); + } + this.name = Objects.requireNonNull(name); + if (name.isEmpty()) { + throw new IllegalArgumentException("Topic name cannot be empty."); + } + this.numPartitions = numPartitions; + if (numPartitions <= 0) { + throw new IllegalArgumentException("Number of partitions must be positive."); + } + this.partitionRacks = Objects.requireNonNull(partitionRacks); + } + + public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) { + // Converting the data type from a list stored in the record to a map for the topic metadata. + Map> partitionRacks = new HashMap<>(); + for (StreamsGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { + partitionRacks.put( + partitionMetadata.partition(), + Set.copyOf(partitionMetadata.racks()) + ); + } + + return new TopicMetadata( + record.topicId(), + record.topicName(), + record.numPartitions(), + partitionRacks); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java index a7792471e4e84..b574377b1678c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java @@ -65,6 +65,8 @@ public ChangelogTopics( /** * Determines the number of partitions for each non-source changelog topic in the requested topology. * + * @throws IllegalStateException If a source topic does not have a partition count defined through topicPartitionCountProvider. + * * @return the map of all non-source changelog topics for the requested topology to their required number of partitions. */ public Map setup() { @@ -94,7 +96,7 @@ public Map setup() { private int getPartitionCountOrFail(String topic) { final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic); if (topicPartitionCount.isEmpty()) { - throw TopicConfigurationException.missingSourceTopics("No partition count for source topic " + topic); + throw new IllegalStateException("No partition count for source topic " + topic); } return topicPartitionCount.getAsInt(); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java index 86f8080421c46..b6ccb87f7a224 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -31,7 +32,8 @@ * @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized * the topology. * @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it - * specifies the number of tasks available for every subtopology. + * specifies the number of tasks available for every subtopology. Undefined if topology configuration + * failed. * @param internalTopicsToBeCreated Contains a list of internal topics that need to be created. This is used to create the topics in the * broker. * @param topicConfigurationException If the topic configuration process failed, e.g. because expected topics are missing or have an @@ -39,7 +41,7 @@ * reported back to the client. */ public record ConfiguredTopology(int topologyEpoch, - Map subtopologies, + Optional> subtopologies, Map internalTopicsToBeCreated, Optional topicConfigurationException) { @@ -47,6 +49,9 @@ public record ConfiguredTopology(int topologyEpoch, if (topologyEpoch < 0) { throw new IllegalArgumentException("Topology epoch must be non-negative."); } + if (topicConfigurationException.isEmpty() && subtopologies.isEmpty()) { + throw new IllegalArgumentException("Subtopologies must be present if topicConfigurationException is empty."); + } Objects.requireNonNull(subtopologies, "subtopologies can't be null"); Objects.requireNonNull(internalTopicsToBeCreated, "internalTopicsToBeCreated can't be null"); Objects.requireNonNull(topicConfigurationException, "topicConfigurationException can't be null"); @@ -59,9 +64,11 @@ public boolean isReady() { public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() { return new StreamsGroupDescribeResponseData.Topology() .setEpoch(topologyEpoch) - .setSubtopologies(subtopologies.entrySet().stream().map( - entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey()) - ).collect(Collectors.toList())); + .setSubtopologies( + subtopologies.map(stringConfiguredSubtopologyMap -> stringConfiguredSubtopologyMap.entrySet().stream().map( + entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey()) + ).collect(Collectors.toList())).orElse(Collections.emptyList()) + ); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java index 4bcc229ef3432..b7928931dd110 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcer.java @@ -48,8 +48,7 @@ public class CopartitionedTopicsEnforcer { * @param logContext The context for emitting log messages. * @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the broker * as well as any partition number decisions that have already been made. In particular, we expect - * the number of partitions for all repartition topics defined, even if they do not exist in the - * broker yet. + * the number of partitions for all topics in all co-partitions groups to be defined. */ public CopartitionedTopicsEnforcer(final LogContext logContext, final Function topicPartitionCountProvider) { @@ -65,8 +64,8 @@ public CopartitionedTopicsEnforcer(final LogContext logContext, * client (in particular, when the user uses `repartition` in the DSL). * @param flexibleRepartitionTopics The set of repartition topics whose partition count is flexible, and can be changed. * - * @throws TopicConfigurationException If source topics are missing, or there are topics in copartitionTopics that are not copartitioned - * according to topicPartitionCountProvider. + * @throws IllegalStateException If the partition count for any topic in copartitionedTopics is not defined by + * topicPartitionCountProvider. * * @return A map from all repartition topics in copartitionedTopics to their updated partition counts. */ @@ -74,6 +73,7 @@ public Map enforce(final Set copartitionedTopics, final Set fixedRepartitionTopics, final Set flexibleRepartitionTopics) throws StreamsInvalidTopologyException { if (copartitionedTopics.isEmpty()) { + log.debug("Ignoring unexpected empty copartitioned topics set."); return Collections.emptyMap(); } final Map returnedPartitionCounts = new HashMap<>(); @@ -85,16 +85,7 @@ public Map enforce(final Set copartitionedTopics, final Map nonRepartitionTopicPartitions = copartitionedTopics.stream().filter(topic -> !repartitionTopicPartitionCounts.containsKey(topic)) - .collect(Collectors.toMap(topic -> topic, topic -> { - final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic); - if (topicPartitionCount.isEmpty()) { - final String str = String.format("Following topics are missing: [%s]", topic); - log.error(str); - throw TopicConfigurationException.missingSourceTopics(str); - } else { - return topicPartitionCount.getAsInt(); - } - })); + .collect(Collectors.toMap(topic -> topic, this::getPartitionCount)); final int numPartitionsToUseForRepartitionTopics; @@ -139,7 +130,7 @@ private int getPartitionCount(final String topicName) { if (partitions.isPresent()) { return partitions.getAsInt(); } else { - throw new StreamsInvalidTopologyException("Number of partitions is not set for topic: " + topicName); + throw new IllegalStateException("Number of partitions is not set for topic: " + topicName); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java new file mode 100644 index 0000000000000..31029d9fd9e39 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.streams.StreamsTopology; +import org.apache.kafka.coordinator.group.streams.TopicMetadata; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Responsible for configuring internal topics for a given topology. + */ +public class InternalTopicManager { + + /** + * Configures the internal topics for the given topology. Given a topology and the topic metadata, this method determines the number of + * partitions for all internal topics and returns a {@link ConfiguredTopology} object. + * + * @param logContext The log context. + * @param topology The topology. + * @param topicMetadata The topic metadata. + * @return The configured topology. + */ + public static ConfiguredTopology configureTopics(LogContext logContext, + StreamsTopology topology, + Map topicMetadata) { + final Logger log = logContext.logger(InternalTopicManager.class); + final Collection subtopologies = topology.subtopologies().values(); + + final Map>> copartitionGroupsBySubtopology = + subtopologies.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + InternalTopicManager::copartitionGroupsFromPersistedSubtopology) + ); + + try { + Optional topicConfigurationException = Optional.empty(); + + throwOnMissingSourceTopics(topology, topicMetadata); + + Map decidedPartitionCountsForInternalTopics = + decidePartitionCounts(logContext, topology, topicMetadata, copartitionGroupsBySubtopology, log); + + final Map configuredSubtopologies = + subtopologies.stream() + .collect(Collectors.toMap( + StreamsGroupTopologyValue.Subtopology::subtopologyId, + x -> fromPersistedSubtopology(x, decidedPartitionCountsForInternalTopics)) + ); + + Map internalTopicsToCreate = missingInternalTopics(configuredSubtopologies, topicMetadata); + if (!internalTopicsToCreate.isEmpty()) { + topicConfigurationException = Optional.of(TopicConfigurationException.missingInternalTopics( + "Internal topics are missing: " + internalTopicsToCreate.keySet() + )); + log.info("Valid topic configuration found, but internal topics are missing for topology epoch {}: {}", + topology.topologyEpoch(), topicConfigurationException.get().toString()); + } else { + log.info("Valid topic configuration found, topology epoch {} is now initialized.", topology.topologyEpoch()); + } + + return new ConfiguredTopology( + topology.topologyEpoch(), + Optional.of(configuredSubtopologies), + internalTopicsToCreate, + topicConfigurationException + ); + + } catch (TopicConfigurationException e) { + log.warn("Topic configuration failed for topology epoch {}: {} ", + topology.topologyEpoch(), e.toString()); + return new ConfiguredTopology( + topology.topologyEpoch(), + Optional.empty(), + Map.of(), + Optional.of(e) + ); + } + } + + private static void throwOnMissingSourceTopics(final StreamsTopology topology, + final Map topicMetadata) { + TreeSet sortedMissingTopics = new TreeSet<>(); + for (StreamsGroupTopologyValue.Subtopology subtopology : topology.subtopologies().values()) { + for (String sourceTopic : subtopology.sourceTopics()) { + if (!topicMetadata.containsKey(sourceTopic)) { + sortedMissingTopics.add(sourceTopic); + } + } + } + if (!sortedMissingTopics.isEmpty()) { + throw TopicConfigurationException.missingSourceTopics( + "Source topics " + String.join(", ", sortedMissingTopics) + " are missing."); + } + } + + private static Map decidePartitionCounts(final LogContext logContext, + final StreamsTopology topology, + final Map topicMetadata, + final Map>> copartitionGroupsBySubtopology, + final Logger log) { + final Map decidedPartitionCountsForInternalTopics = new HashMap<>(); + final Function topicPartitionCountProvider = + topic -> getPartitionCount(topicMetadata, topic, decidedPartitionCountsForInternalTopics); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + logContext, + topology.subtopologies().values(), + topicPartitionCountProvider); + final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = new CopartitionedTopicsEnforcer( + logContext, + topicPartitionCountProvider); + final ChangelogTopics changelogTopics = new ChangelogTopics(logContext, + topology.subtopologies().values(), + topicPartitionCountProvider); + + decidedPartitionCountsForInternalTopics.putAll(repartitionTopics.setup()); + + enforceCopartitioning( + topology, + copartitionGroupsBySubtopology, + log, + decidedPartitionCountsForInternalTopics, + copartitionedTopicsEnforcer + ); + + decidedPartitionCountsForInternalTopics.putAll(changelogTopics.setup()); + + return decidedPartitionCountsForInternalTopics; + } + + private static void enforceCopartitioning(final StreamsTopology topology, + final Map>> copartitionGroupsBySubtopology, + final Logger log, + final Map decidedPartitionCountsForInternalTopics, + final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) { + final Set fixedRepartitionTopics = + topology.subtopologies().values().stream().flatMap(x -> + x.repartitionSourceTopics().stream().filter(y -> y.partitions() != 0) + ).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet()); + final Set flexibleRepartitionTopics = + topology.subtopologies().values().stream().flatMap(x -> + x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0) + ).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet()); + + if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) { + log.info("Skipping the repartition topic validation since there are no repartition topics."); + } else { + // ensure the co-partitioning topics within the group have the same number of partitions, + // and enforce the number of partitions for those repartition topics to be the same if they + // are co-partitioned as well. + for (Collection> copartitionGroups : copartitionGroupsBySubtopology.values()) { + for (Set copartitionGroup : copartitionGroups) { + decidedPartitionCountsForInternalTopics.putAll( + copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics)); + } + } + } + } + + private static Map missingInternalTopics(Map subtopologyMap, + Map topicMetadata) { + + final Map topicsToCreate = new HashMap<>(); + for (ConfiguredSubtopology subtopology : subtopologyMap.values()) { + subtopology.repartitionSourceTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + subtopology.stateChangelogTopics().values() + .forEach(x -> topicsToCreate.put(x.name(), toCreatableTopic(x))); + } + for (Map.Entry topic : topicMetadata.entrySet()) { + final TopicMetadata existingTopic = topic.getValue(); + final CreatableTopic expectedTopic = topicsToCreate.remove(topic.getKey()); + if (expectedTopic != null) { + if (existingTopic.numPartitions() != expectedTopic.numPartitions()) { + throw TopicConfigurationException.incorrectlyPartitionedTopics("Existing topic " + topic.getKey() + " has different" + + " number of partitions: expected " + expectedTopic.numPartitions() + ", found " + existingTopic.numPartitions()); + } + } + } + return topicsToCreate; + } + + private static OptionalInt getPartitionCount(Map topicMetadata, + String topic, + Map decidedPartitionCountsForInternalTopics) { + final TopicMetadata metadata = topicMetadata.get(topic); + if (metadata == null) { + if (decidedPartitionCountsForInternalTopics.containsKey(topic)) { + return OptionalInt.of(decidedPartitionCountsForInternalTopics.get(topic)); + } else { + return OptionalInt.empty(); + } + } else { + return OptionalInt.of(metadata.numPartitions()); + } + } + + private static CreatableTopic toCreatableTopic(final ConfiguredInternalTopic config) { + + final CreatableTopic creatableTopic = new CreatableTopic(); + + creatableTopic.setName(config.name()); + creatableTopic.setNumPartitions(config.numberOfPartitions()); + + if (config.replicationFactor().isPresent() && config.replicationFactor().get() != 0) { + creatableTopic.setReplicationFactor(config.replicationFactor().get()); + } else { + creatableTopic.setReplicationFactor((short) -1); + } + + final CreatableTopicConfigCollection topicConfigs = new CreatableTopicConfigCollection(); + + config.topicConfigs().forEach((k, v) -> { + final CreatableTopicConfig topicConfig = new CreatableTopicConfig(); + topicConfig.setName(k); + topicConfig.setValue(v); + topicConfigs.add(topicConfig); + }); + + creatableTopic.setConfigs(topicConfigs); + + return creatableTopic; + } + + private static ConfiguredSubtopology fromPersistedSubtopology(final StreamsGroupTopologyValue.Subtopology subtopology, + final Map decidedPartitionCountsForInternalTopics + ) { + return new ConfiguredSubtopology( + new HashSet<>(subtopology.sourceTopics()), + subtopology.repartitionSourceTopics().stream() + .map(x -> fromPersistedTopicInfo(x, decidedPartitionCountsForInternalTopics)) + .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)), + new HashSet<>(subtopology.repartitionSinkTopics()), + subtopology.stateChangelogTopics().stream() + .map(x -> fromPersistedTopicInfo(x, decidedPartitionCountsForInternalTopics)) + .collect(Collectors.toMap(ConfiguredInternalTopic::name, x -> x)) + ); + } + + private static ConfiguredInternalTopic fromPersistedTopicInfo(final StreamsGroupTopologyValue.TopicInfo topicInfo, + final Map decidedPartitionCountsForInternalTopics) { + if (topicInfo.partitions() == 0 && !decidedPartitionCountsForInternalTopics.containsKey(topicInfo.name())) { + throw new IllegalStateException("Number of partitions must be set for topic " + topicInfo.name()); + } + + return new ConfiguredInternalTopic( + topicInfo.name(), + topicInfo.partitions() == 0 ? decidedPartitionCountsForInternalTopics.get(topicInfo.name()) : topicInfo.partitions(), + topicInfo.replicationFactor() == 0 ? Optional.empty() + : Optional.of(topicInfo.replicationFactor()), + topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream() + .collect(Collectors.toMap(StreamsGroupTopologyValue.TopicConfig::key, + StreamsGroupTopologyValue.TopicConfig::value)) + : Collections.emptyMap() + ); + } + + private static Collection> copartitionGroupsFromPersistedSubtopology( + final StreamsGroupTopologyValue.Subtopology subtopology + ) { + return subtopology.copartitionGroups().stream().map(copartitionGroup -> + Stream.concat( + copartitionGroup.sourceTopics().stream() + .map(i -> subtopology.sourceTopics().get(i)), + copartitionGroup.repartitionSourceTopics().stream() + .map(i -> subtopology.repartitionSourceTopics().get(i).name()) + ).collect(Collectors.toSet()) + ).collect(Collectors.toList()); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java index d1fefe67864ff..c37902b1f1788 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java @@ -48,7 +48,7 @@ public class RepartitionTopics { * @param logContext The context for emitting log messages. * @param subtopologies The subtopologies for the requested topology. * @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the - * broker. + * broker. This class requires the number of partition for all source topics to be defined. */ public RepartitionTopics(final LogContext logContext, final Collection subtopologies, @@ -63,8 +63,7 @@ public RepartitionTopics(final LogContext logContext, * * @return the map of repartition topics for the requested topology to their required number of partitions. * - * @throws TopicConfigurationException if no valid configuration can be found given the broker state, for example, if a source topic - * is missing. + * @throws IllegalStateException if the number of partitions for a source topic is not defined by topicPartitionCountProvider. * @throws StreamsInvalidTopologyException if the number of partitions for all repartition topics cannot be determined, e.g. * because of loops, or if a repartition source topic is not a sink topic of any subtopology. */ @@ -77,7 +76,7 @@ public Map setup() { } if (!missingSourceTopicsForTopology.isEmpty()) { - throw TopicConfigurationException.missingSourceTopics(String.format("Missing source topics: %s", + throw new IllegalStateException(String.format("Missing source topics: %s", String.join(", ", missingSourceTopicsForTopology))); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java new file mode 100644 index 0000000000000..38d63dab6ef1e --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TopicMetadataTest { + + @Test + public void testConstructor() { + assertDoesNotThrow(() -> + new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, new HashMap<>())); + } + + @Test + public void testConstructorWithZeroUuid() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> + new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3, new HashMap<>())); + assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage()); + } + + @Test + public void testConstructorWithNullUuid() { + assertThrows(NullPointerException.class, () -> + new TopicMetadata(null, "valid-topic", 3, new HashMap<>())); + } + + @Test + public void testConstructorWithNullName() { + assertThrows(NullPointerException.class, () -> + new TopicMetadata(Uuid.randomUuid(), null, 3, new HashMap<>())); + } + + @Test + public void testConstructorWithEmptyName() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> + new TopicMetadata(Uuid.randomUuid(), "", 3, new HashMap<>())); + assertEquals("Topic name cannot be empty.", exception.getMessage()); + } + + @Test + public void testConstructorWithZeroNumPartitions() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> + new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0, new HashMap<>())); + assertEquals("Number of partitions must be positive.", exception.getMessage()); + } + + @Test + public void testConstructorWithNegativeNumPartitions() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> + new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1, new HashMap<>())); + assertEquals("Number of partitions must be positive.", exception.getMessage()); + } + + @Test + public void testConstructorWithNullPartitionRacks() { + assertThrows(NullPointerException.class, () -> + new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, null)); + } + + @Test + public void testFromRecord() { + StreamsGroupPartitionMetadataValue.TopicMetadata record = new StreamsGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(Uuid.randomUuid()) + .setTopicName("test-topic") + .setNumPartitions(3) + .setPartitionMetadata(List.of( + new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(0) + .setRacks(List.of("rack1", "rack2")), + new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(1) + .setRacks(List.of("rack3")), + new StreamsGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(2) + .setRacks(List.of("rack4", "rack5")) + )); + + TopicMetadata topicMetadata = TopicMetadata.fromRecord(record); + + assertEquals(record.topicId(), topicMetadata.id()); + assertEquals(record.topicName(), topicMetadata.name()); + assertEquals(record.numPartitions(), topicMetadata.numPartitions()); + + Map> expectedPartitionRacks = new HashMap<>(); + expectedPartitionRacks.put(0, Set.of("rack1", "rack2")); + expectedPartitionRacks.put(1, Set.of("rack3")); + expectedPartitionRacks.put(2, Set.of("rack4", "rack5")); + + assertEquals(expectedPartitionRacks, topicMetadata.partitionRacks()); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java index fc862a7a02745..2d6d096235a62 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java @@ -53,7 +53,7 @@ public void testConstructorWithNullInternalTopicsToBeCreated() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - Collections.emptyMap(), + Optional.of(Map.of()), null, Optional.empty() ) @@ -65,7 +65,7 @@ public void testConstructorWithNullTopicConfigurationException() { assertThrows(NullPointerException.class, () -> new ConfiguredTopology( 0, - Collections.emptyMap(), + Optional.empty(), Collections.emptyMap(), null ) @@ -77,21 +77,34 @@ public void testConstructorWithInvalidTopologyEpoch() { assertThrows(IllegalArgumentException.class, () -> new ConfiguredTopology( -1, + Optional.of(Map.of()), Collections.emptyMap(), + Optional.empty() + ) + ); + } + + @Test + public void testNoExceptionButNoSubtopologies() { + final IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> new ConfiguredTopology( + 1, + Optional.empty(), Collections.emptyMap(), Optional.empty() ) ); + assertEquals(ex.getMessage(), "Subtopologies must be present if topicConfigurationException is empty."); } @Test public void testIsReady() { ConfiguredTopology readyTopology = new ConfiguredTopology( - 1, new HashMap<>(), new HashMap<>(), Optional.empty()); + 1, Optional.of(Map.of()), new HashMap<>(), Optional.empty()); assertTrue(readyTopology.isReady()); ConfiguredTopology notReadyTopology = new ConfiguredTopology( - 1, new HashMap<>(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing"))); + 1, Optional.empty(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing"))); assertFalse(notReadyTopology.isReady()); } @@ -106,7 +119,7 @@ public void testAsStreamsGroupDescribeTopology() { Map internalTopicsToBeCreated = new HashMap<>(); Optional topicConfigurationException = Optional.empty(); ConfiguredTopology configuredTopology = new ConfiguredTopology( - topologyEpoch, subtopologies, internalTopicsToBeCreated, topicConfigurationException); + topologyEpoch, Optional.of(subtopologies), internalTopicsToBeCreated, topicConfigurationException); StreamsGroupDescribeResponseData.Topology topology = configuredTopology.asStreamsGroupDescribeTopology(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java index d2c466157f9c8..5529d335de51c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/CopartitionedTopicsEnforcerTest.java @@ -48,19 +48,18 @@ private static Function topicPartitionProvider(Map topicPartitionCounts = Collections.emptyMap(); final CopartitionedTopicsEnforcer enforcer = new CopartitionedTopicsEnforcer(LOG_CONTEXT, topicPartitionProvider(topicPartitionCounts)); - final TopicConfigurationException ex = assertThrows(TopicConfigurationException.class, () -> + final IllegalStateException ex = assertThrows(IllegalStateException.class, () -> enforcer.enforce( Set.of(SOURCE_TOPIC_1), Set.of(), Set.of() )); - assertEquals(Status.MISSING_SOURCE_TOPICS, ex.status()); - assertEquals(String.format("Following topics are missing: [%s]", SOURCE_TOPIC_1), ex.getMessage()); + assertEquals(String.format("Number of partitions is not set for topic: %s", SOURCE_TOPIC_1), ex.getMessage()); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java new file mode 100644 index 0000000000000..4d7fdc08d4c43 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.streams.StreamsTopology; +import org.apache.kafka.coordinator.group.streams.TopicMetadata; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class InternalTopicManagerTest { + + public static final String SOURCE_TOPIC_1 = "source_topic1"; + public static final String SOURCE_TOPIC_2 = "source_topic2"; + public static final String REPARTITION_TOPIC = "repartition_topic"; + public static final String STATE_CHANGELOG_TOPIC_1 = "state_changelog_topic1"; + public static final String STATE_CHANGELOG_TOPIC_2 = "state_changelog_topic2"; + public static final String SUBTOPOLOGY_1 = "subtopology1"; + public static final String SUBTOPOLOGY_2 = "subtopology2"; + public static final String CONFIG_KEY = "cleanup.policy"; + public static final String CONFIG_VALUE = "compact"; + + @Test + void testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap())); + // SOURCE_TOPIC_2 is missing from topicMetadata + StreamsTopology topology = makeTestTopology(); + + final ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata); + + assertEquals(Optional.empty(), configuredTopology.subtopologies()); + assertTrue(configuredTopology.topicConfigurationException().isPresent()); + assertEquals(Status.MISSING_SOURCE_TOPICS, configuredTopology.topicConfigurationException().get().status()); + assertEquals(String.format("Source topics %s are missing.", SOURCE_TOPIC_2), configuredTopology.topicConfigurationException().get().getMessage()); + } + + @Test + void testConfigureTopics() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap())); + topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2, Collections.emptyMap())); + topicMetadata.put(STATE_CHANGELOG_TOPIC_2, + new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2, Collections.emptyMap())); + StreamsTopology topology = makeTestTopology(); + + ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata); + final Map internalTopicsToBeCreated = configuredTopology.internalTopicsToBeCreated(); + + assertEquals(2, internalTopicsToBeCreated.size()); + assertEquals( + new CreatableTopic() + .setName(REPARTITION_TOPIC) + .setNumPartitions(2) + .setReplicationFactor((short) 3), + internalTopicsToBeCreated.get(REPARTITION_TOPIC) + ); + assertEquals( + new CreatableTopic() + .setName(STATE_CHANGELOG_TOPIC_1) + .setNumPartitions(2) + .setReplicationFactor((short) -1) + .setConfigs( + new CreatableTopicConfigCollection( + Collections.singletonList(new CreatableTopicConfig().setName(CONFIG_KEY).setValue(CONFIG_VALUE)).iterator()) + ), + internalTopicsToBeCreated.get(STATE_CHANGELOG_TOPIC_1)); + + Optional> expectedConfiguredTopology = Optional.of(makeExpectedConfiguredSubtopologies()); + assertEquals(expectedConfiguredTopology, configuredTopology.subtopologies()); + } + + private static Map makeExpectedConfiguredSubtopologies() { + return mkMap( + mkEntry(SUBTOPOLOGY_1, + new ConfiguredSubtopology( + Set.of(SOURCE_TOPIC_1), + Map.of(), + Set.of(REPARTITION_TOPIC), + Map.of(STATE_CHANGELOG_TOPIC_1, + new ConfiguredInternalTopic( + STATE_CHANGELOG_TOPIC_1, + 2, + Optional.empty(), + Map.of(CONFIG_KEY, CONFIG_VALUE) + )) + ) + ), + mkEntry(SUBTOPOLOGY_2, + new ConfiguredSubtopology( + Set.of(SOURCE_TOPIC_2), + Map.of(REPARTITION_TOPIC, + new ConfiguredInternalTopic(REPARTITION_TOPIC, + 2, + Optional.of((short) 3), + Collections.emptyMap() + ) + ), + Set.of(), + Map.of(STATE_CHANGELOG_TOPIC_2, + new ConfiguredInternalTopic(STATE_CHANGELOG_TOPIC_2, + 2, + Optional.empty(), + Collections.emptyMap() + ))) + ) + ); + } + + private static StreamsTopology makeTestTopology() { + // Create a subtopology source -> repartition + Subtopology subtopology1 = new Subtopology() + .setSubtopologyId(SUBTOPOLOGY_1) + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_1)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC)) + .setStateChangelogTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName(STATE_CHANGELOG_TOPIC_1) + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey(CONFIG_KEY) + .setValue(CONFIG_VALUE) + )) + )); + // Create a subtopology repartition/source2 -> sink (copartitioned) + Subtopology subtopology2 = new Subtopology() + .setSubtopologyId(SUBTOPOLOGY_2) + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_2)) + .setRepartitionSourceTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName(REPARTITION_TOPIC) + .setReplicationFactor((short) 3) + )) + .setStateChangelogTopics(Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName(STATE_CHANGELOG_TOPIC_2) + )) + .setCopartitionGroups(Collections.singletonList( + new StreamsGroupTopologyValue.CopartitionGroup() + .setSourceTopics(Collections.singletonList((short) 0)) + .setRepartitionSourceTopics(Collections.singletonList((short) 0)) + )); + + return new StreamsTopology(3, Map.of(SUBTOPOLOGY_1, subtopology1, SUBTOPOLOGY_2, subtopology2)); + } + +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java index 8257f42dbae3b..decc4d0484f66 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.coordinator.group.streams.topics; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; -import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; @@ -72,7 +71,7 @@ public void shouldSetupRepartitionTopics() { } @Test - public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + public void shouldThrowIllegalStateExceptionIfMissingSourceTopics() { final Subtopology subtopology1 = new Subtopology() .setSubtopologyId("subtopology1") .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) @@ -88,10 +87,9 @@ public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics( topicPartitionCountProvider ); - final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + final IllegalStateException exception = assertThrows(IllegalStateException.class, repartitionTopics::setup); - assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); assertEquals("Missing source topics: source1", exception.getMessage()); }