Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Immutable topic metadata, representing the current state of a topic in the broker.
*
* @param id The topic ID.
* @param name The topic name.
* @param numPartitions The number of partitions.
* @param partitionRacks Map of every partition ID to a set of its rack IDs, if they exist. If rack information is unavailable for all
* partitions, this is an empty map.
*/
public record TopicMetadata(Uuid id, String name, int numPartitions, Map<Integer, Set<String>> partitionRacks) {

public TopicMetadata(Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
throw new IllegalArgumentException("Topic id cannot be ZERO_UUID.");
}
this.name = Objects.requireNonNull(name);
if (name.isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be empty.");
}
this.numPartitions = numPartitions;
if (numPartitions <= 0) {
throw new IllegalArgumentException("Number of partitions must be positive.");
}
this.partitionRacks = Objects.requireNonNull(partitionRacks);
}

public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
// Converting the data type from a list stored in the record to a map for the topic metadata.
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
for (StreamsGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
partitionRacks.put(
partitionMetadata.partition(),
Set.copyOf(partitionMetadata.racks())
);
}

return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions(),
partitionRacks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public ChangelogTopics(
/**
* Determines the number of partitions for each non-source changelog topic in the requested topology.
*
* @throws IllegalStateException If a source topic does not have a partition count defined through topicPartitionCountProvider.
*
* @return the map of all non-source changelog topics for the requested topology to their required number of partitions.
*/
public Map<String, Integer> setup() {
Expand Down Expand Up @@ -94,7 +96,7 @@ public Map<String, Integer> setup() {
private int getPartitionCountOrFail(String topic) {
final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic);
if (topicPartitionCount.isEmpty()) {
throw TopicConfigurationException.missingSourceTopics("No partition count for source topic " + topic);
throw new IllegalStateException("No partition count for source topic " + topic);
}
return topicPartitionCount.getAsInt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -31,22 +32,26 @@
* @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized
* the topology.
* @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it
* specifies the number of tasks available for every subtopology.
* specifies the number of tasks available for every subtopology. Undefined if topology configuration
* failed.
* @param internalTopicsToBeCreated Contains a list of internal topics that need to be created. This is used to create the topics in the
* broker.
* @param topicConfigurationException If the topic configuration process failed, e.g. because expected topics are missing or have an
* incorrect number of partitions, this field will store the error that occurred, so that is can be
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
Map<String, ConfiguredSubtopology> subtopologies,
Optional<Map<String, ConfiguredSubtopology>> subtopologies,
Map<String, CreatableTopic> internalTopicsToBeCreated,
Optional<TopicConfigurationException> topicConfigurationException) {

public ConfiguredTopology {
if (topologyEpoch < 0) {
throw new IllegalArgumentException("Topology epoch must be non-negative.");
}
if (topicConfigurationException.isEmpty() && subtopologies.isEmpty()) {
throw new IllegalArgumentException("Subtopologies must be present if topicConfigurationException is empty.");
}
Objects.requireNonNull(subtopologies, "subtopologies can't be null");
Objects.requireNonNull(internalTopicsToBeCreated, "internalTopicsToBeCreated can't be null");
Objects.requireNonNull(topicConfigurationException, "topicConfigurationException can't be null");
Expand All @@ -59,9 +64,11 @@ public boolean isReady() {
public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
return new StreamsGroupDescribeResponseData.Topology()
.setEpoch(topologyEpoch)
.setSubtopologies(subtopologies.entrySet().stream().map(
entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
).collect(Collectors.toList()));
.setSubtopologies(
subtopologies.map(stringConfiguredSubtopologyMap -> stringConfiguredSubtopologyMap.entrySet().stream().map(
entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
).collect(Collectors.toList())).orElse(Collections.emptyList())
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public class CopartitionedTopicsEnforcer {
* @param logContext The context for emitting log messages.
* @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the broker
* as well as any partition number decisions that have already been made. In particular, we expect
* the number of partitions for all repartition topics defined, even if they do not exist in the
* broker yet.
* the number of partitions for all topics in all co-partitions groups to be defined.
*/
public CopartitionedTopicsEnforcer(final LogContext logContext,
final Function<String, OptionalInt> topicPartitionCountProvider) {
Expand All @@ -65,15 +64,16 @@ public CopartitionedTopicsEnforcer(final LogContext logContext,
* client (in particular, when the user uses `repartition` in the DSL).
* @param flexibleRepartitionTopics The set of repartition topics whose partition count is flexible, and can be changed.
*
* @throws TopicConfigurationException If source topics are missing, or there are topics in copartitionTopics that are not copartitioned
* according to topicPartitionCountProvider.
* @throws IllegalStateException If the partition count for any topic in copartitionedTopics is not defined by
* topicPartitionCountProvider.
*
* @return A map from all repartition topics in copartitionedTopics to their updated partition counts.
*/
public Map<String, Integer> enforce(final Set<String> copartitionedTopics,
final Set<String> fixedRepartitionTopics,
final Set<String> flexibleRepartitionTopics) throws StreamsInvalidTopologyException {
if (copartitionedTopics.isEmpty()) {
log.debug("Ignoring unexpected empty copartitioned topics set.");
return Collections.emptyMap();
}
final Map<String, Integer> returnedPartitionCounts = new HashMap<>();
Expand All @@ -85,16 +85,7 @@ public Map<String, Integer> enforce(final Set<String> copartitionedTopics,

final Map<String, Integer> nonRepartitionTopicPartitions =
copartitionedTopics.stream().filter(topic -> !repartitionTopicPartitionCounts.containsKey(topic))
.collect(Collectors.toMap(topic -> topic, topic -> {
final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic);
if (topicPartitionCount.isEmpty()) {
final String str = String.format("Following topics are missing: [%s]", topic);
log.error(str);
throw TopicConfigurationException.missingSourceTopics(str);
} else {
return topicPartitionCount.getAsInt();
}
}));
.collect(Collectors.toMap(topic -> topic, this::getPartitionCount));

final int numPartitionsToUseForRepartitionTopics;

Expand Down Expand Up @@ -139,7 +130,7 @@ private int getPartitionCount(final String topicName) {
if (partitions.isPresent()) {
return partitions.getAsInt();
} else {
throw new StreamsInvalidTopologyException("Number of partitions is not set for topic: " + topicName);
throw new IllegalStateException("Number of partitions is not set for topic: " + topicName);
}
}

Expand Down
Loading