diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 7cfd0510cc767..284d4f68308c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -450,12 +450,17 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional offsets, AlterConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException("Not implement yet"); + throw new UnsupportedOperationException("Not implemented yet"); } @Override public ListOffsetsResult listOffsets(Map topicPartitionOffsets, ListOffsetsOptions options) { - throw new UnsupportedOperationException("Not implement yet"); + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ListOffsetsResult listOffsets(Map topicPartitionOffsets) { + throw new UnsupportedOperationException("Not implemented yet"); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2a7470a3999e8..bdf87566c8e7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -16,7 +16,12 @@ */ package org.apache.kafka.streams; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -46,14 +51,17 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.ThreadMetadata; -import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.StandbyTask; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; @@ -130,6 +138,7 @@ public class KafkaStreams implements AutoCloseable { private static final String JMX_PREFIX = "kafka.streams"; + private static final long UNKNOWN_POSITION = -1; // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware @@ -212,7 +221,7 @@ public enum State { this.validTransitions.addAll(Arrays.asList(validTransitions)); } - public boolean isRunning() { + public boolean isRunningOrRebalancing() { return equals(RUNNING) || equals(REBALANCING); } @@ -296,17 +305,18 @@ public State state() { return state; } - private boolean isRunning() { + private boolean isRunningOrRebalancing() { synchronized (stateLock) { - return state.isRunning(); + return state.isRunningOrRebalancing(); } } - private void validateIsRunning() { - if (!isRunning()) { + private void validateIsRunningOrRebalancing() { + if (!isRunningOrRebalancing()) { throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); } } + /** * Listen to {@link State} change events. */ @@ -1003,7 +1013,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument * @throws StreamsException if cleanup failed */ public void cleanUp() { - if (isRunning()) { + if (isRunningOrRebalancing()) { throw new IllegalStateException("Cannot clean up while running."); } stateDirectory.clean(); @@ -1019,7 +1029,7 @@ public void cleanUp() { * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application */ public Collection allMetadata() { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getAllMetadata(); } @@ -1039,7 +1049,7 @@ public Collection allMetadata() { * this application */ public Collection allMetadataForStore(final String storeName) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getAllMetadataForStore(storeName); } @@ -1073,13 +1083,15 @@ public Collection allMetadataForStore(final String storeName) { * @param key the key to find metadata for * @param keySerializer serializer for the key * @param key type - * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and + * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provided {@code storeName} and * {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing + * @deprecated Use {@link #queryMetadataForKey(String, Object, Serializer)} instead. */ + @Deprecated public StreamsMetadata metadataForKey(final String storeName, final K key, final Serializer keySerializer) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer); } @@ -1104,16 +1116,125 @@ public StreamsMetadata metadataForKey(final String storeName, * @param key the key to find metadata for * @param partitioner the partitioner to be use to locate the host for the key * @param key type - * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and + * @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provided {@code storeName} and * {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing + * @deprecated Use {@link #queryMetadataForKey(String, Object, StreamPartitioner)} instead. */ + @Deprecated public StreamsMetadata metadataForKey(final String storeName, final K key, final StreamPartitioner partitioner) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner); } + /** + * Finds the metadata containing the active hosts and standby hosts where the key being queried would reside. + * + * @param storeName the {@code storeName} to find metadata for + * @param key the key to find metadata for + * @param keySerializer serializer for the key + * @param key type + * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store. + */ + public KeyQueryMetadata queryMetadataForKey(final String storeName, + final K key, + final Serializer keySerializer) { + validateIsRunningOrRebalancing(); + return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer); + } + + /** + * Finds the metadata containing the active hosts and standby hosts where the key being queried would reside. + * + * @param storeName the {@code storeName} to find metadata for + * @param key the key to find metadata for + * @param partitioner the partitioner to be use to locate the host for the key + * @param key type + * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the + * the supplied partitioner + */ + public KeyQueryMetadata queryMetadataForKey(final String storeName, + final K key, + final StreamPartitioner partitioner) { + validateIsRunningOrRebalancing(); + return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner); + } + + /** + * Returns {@link LagInfo}, for all store partitions (active or standby) local to this Streams instance. Note that the + * values returned are just estimates and meant to be used for making soft decisions on whether the data in the store + * partition is fresh enough for querying. + * + * @return map of store names to another map of partition to {@link LagInfo}s + */ + public Map> allLocalStorePartitionLags() { + final Map> localStorePartitionLags = new HashMap<>(); + final Map standbyChangelogPositions = new HashMap<>(); + final Map activeChangelogPositions = new HashMap<>(); + + // Obtain the current positions, of all the active-restoring and standby tasks + for (final StreamThread streamThread : this.threads) { + for (final StandbyTask standbyTask : streamThread.allStandbyTasks()) { + final Map checkpointedOffsets = standbyTask.checkpointedOffsets(); + standbyTask.changelogPartitions().forEach(topicPartition -> + standbyChangelogPositions.put(topicPartition, checkpointedOffsets.getOrDefault(topicPartition, UNKNOWN_POSITION))); + } + + final Set restoringTaskIds = streamThread.restoringTaskIds(); + for (final StreamTask activeTask : streamThread.allStreamsTasks()) { + final boolean isRestoring = restoringTaskIds.contains(activeTask.id()); + final Map restoredOffsets = activeTask.restoredOffsets(); + activeTask.changelogPartitions().forEach(topicPartition -> { + if (isRestoring) { + activeChangelogPositions.put(topicPartition, restoredOffsets.getOrDefault(topicPartition, UNKNOWN_POSITION)); + } else { + activeChangelogPositions.put(topicPartition, UNKNOWN_POSITION); + } + }); + } + } + + log.info("Current changelog positions, for active: " + activeChangelogPositions + " standby:" + standbyChangelogPositions); + final Map offsetSpecMap = Stream.concat( + activeChangelogPositions.keySet().stream(), standbyChangelogPositions.keySet().stream()) + .collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())); + final Map allEndOffsets = new HashMap<>(); + try { + allEndOffsets.putAll(adminClient.listOffsets(offsetSpecMap).all().get()); + } catch (final Exception e) { + throw new StreamsException("Unable to obtain end offsets from kafka", e); + } + log.info("Current end offsets :" + allEndOffsets); + allEndOffsets.forEach((topicPartition, offsetsResultInfo) -> { + final String storeName = streamsMetadataState.getStoreForChangelogTopic(topicPartition.topic()); + final long offsetPosition; + if (activeChangelogPositions.containsKey(topicPartition)) { + // if unknown, assume it's positioned at the tail of changelog partition + offsetPosition = activeChangelogPositions.get(topicPartition) == UNKNOWN_POSITION ? + offsetsResultInfo.offset() : activeChangelogPositions.get(topicPartition); + } else if (standbyChangelogPositions.containsKey(topicPartition)) { + // if unknown, assume it's positioned at the head of changelog partition + offsetPosition = standbyChangelogPositions.get(topicPartition) == UNKNOWN_POSITION ? + 0 : standbyChangelogPositions.get(topicPartition); + } else { + throw new IllegalStateException("Topic Partition " + topicPartition + " should be either active or standby"); + } + + final LagInfo lagInfo = new LagInfo(offsetPosition, offsetsResultInfo.offset()); + final Map partitionToOffsetLag = localStorePartitionLags.getOrDefault(storeName, new HashMap<>()); + if (!partitionToOffsetLag.containsKey(topicPartition.partition())) { + partitionToOffsetLag.put(topicPartition.partition(), lagInfo); + } else { + throw new IllegalStateException("Encountered the same store partition" + storeName + "," + + topicPartition.partition() + " more than once"); + } + localStorePartitionLags.put(storeName, partitionToOffsetLag); + }); + + return Collections.unmodifiableMap(localStorePartitionLags); + } + /** * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. @@ -1127,7 +1248,7 @@ public StreamsMetadata metadataForKey(final String storeName, * {@code queryableStoreType} doesn't exist */ public T store(final String storeName, final QueryableStoreType queryableStoreType) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return queryableStoreProvider.getStore(storeName, queryableStoreType); } @@ -1137,7 +1258,7 @@ public T store(final String storeName, final QueryableStoreType queryable * @return the set of {@link ThreadMetadata}. */ public Set localThreadsMetadata() { - validateIsRunning(); + validateIsRunningOrRebalancing(); final Set threadMetadata = new HashSet<>(); for (final StreamThread thread : threads) { threadMetadata.add(thread.threadMetadata()); diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java new file mode 100644 index 0000000000000..26b35fe324329 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.streams.state.HostInfo; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * Represents all the metadata related to a key, where a particular key resides in a {@link KafkaStreams} application. + * It contains the active {@link HostInfo} and a set of standby {@link HostInfo}s, denoting the instances where the key resides. + * It also contains the partition number where the key belongs, which could be useful when used in conjunction with other APIs. + * e.g: Relating with lags for that store partition using {@link KafkaStreams#allLocalStorePartitionLags()} + * NOTE: This is a point in time view. It may change as rebalances happen. + */ +public class KeyQueryMetadata { + /** + * Sentinel to indicate that the KeyQueryMetadata is currently unavailable. This can occur during rebalance + * operations. + */ + public static final KeyQueryMetadata NOT_AVAILABLE = new KeyQueryMetadata(new HostInfo("unavailable", -1), + Collections.emptySet(), + -1); + + private final HostInfo activeHost; + + private final Set standbyHosts; + + private final int partition; + + public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHosts, final int partition) { + this.activeHost = activeHost; + this.standbyHosts = standbyHosts; + this.partition = partition; + } + + /** + * Get the Active streams instance for given key + * + * @return active instance's {@link HostInfo} + */ + public HostInfo getActiveHost() { + return activeHost; + } + + /** + * Get the Streams instances that host the key as standbys + * + * @return set of standby {@link HostInfo} or a empty set, if no standbys are configured + */ + public Set getStandbyHosts() { + return standbyHosts; + } + + /** + * Get the Store partition corresponding to the key. + * + * @return store partition number + */ + public int getPartition() { + return partition; + } + + @Override + public boolean equals(final Object obj) { + if (!(obj instanceof KeyQueryMetadata)) { + return false; + } + final KeyQueryMetadata keyQueryMetadata = (KeyQueryMetadata) obj; + return Objects.equals(keyQueryMetadata.activeHost, activeHost) + && Objects.equals(keyQueryMetadata.standbyHosts, standbyHosts) + && Objects.equals(keyQueryMetadata.partition, partition); + } + + @Override + public String toString() { + return "KeyQueryMetadata {" + + "activeHost=" + activeHost + + ", standbyHosts=" + standbyHosts + + ", partition=" + partition + + '}'; + } + + @Override + public int hashCode() { + return Objects.hash(activeHost, standbyHosts, partition); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/LagInfo.java b/streams/src/main/java/org/apache/kafka/streams/LagInfo.java new file mode 100644 index 0000000000000..7148840b16e3f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/LagInfo.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Objects; + +/** + * Encapsulates information about lag, at a store partition replica (active or standby). This information is constantly changing as the + * tasks process records and thus, they should be treated as simply instantaenous measure of lag. + */ +public class LagInfo { + + private final long currentOffsetPosition; + + private final long endOffsetPosition; + + private final long offsetLag; + + LagInfo(final long currentOffsetPosition, final long endOffsetPosition) { + this.currentOffsetPosition = currentOffsetPosition; + this.endOffsetPosition = endOffsetPosition; + this.offsetLag = Math.max(0, endOffsetPosition - currentOffsetPosition); + } + + /** + * Get the current maximum offset on the store partition's changelog topic, that has been successfully written into + * the store partition's state store. + * + * @return current consume offset for standby/restoring store partitions & simply endoffset for active store partition replicas + */ + public long currentOffsetPosition() { + return this.currentOffsetPosition; + } + + /** + * Get the end offset position for this store partition's changelog topic on the Kafka brokers. + * + * @return last offset written to the changelog topic partition + */ + public long endOffsetPosition() { + return this.endOffsetPosition; + } + + /** + * Get the measured lag between current and end offset positions, for this store partition replica + * + * @return lag as measured by message offsets + */ + public long offsetLag() { + return this.offsetLag; + } + + @Override + public boolean equals(final Object obj) { + if (!(obj instanceof LagInfo)) { + return false; + } + final LagInfo other = (LagInfo) obj; + return currentOffsetPosition == other.currentOffsetPosition + && endOffsetPosition == other.endOffsetPosition + && this.offsetLag == other.offsetLag; + } + + @Override + public int hashCode() { + return Objects.hash(currentOffsetPosition, endOffsetPosition, offsetLag); + } + + @Override + public String toString() { + return "LagInfo {" + + " currentOffsetPosition=" + currentOffsetPosition + + ", endOffsetPosition=" + endOffsetPosition + + ", offsetLag=" + offsetLag + + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index cdec8179877e2..d6b2e97b48c57 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -302,10 +302,10 @@ public class StreamsConfig extends AbstractConfig { public static final String APPLICATION_ID_CONFIG = "application.id"; private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; - /**{@code user.endpoint} */ + /**{@code application.server} */ @SuppressWarnings("WeakerAccess") public static final String APPLICATION_SERVER_CONFIG = "application.server"; - private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; + private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance."; /** {@code bootstrap.servers} */ @SuppressWarnings("WeakerAccess") diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index a18b2b6a3590a..75317cc827851 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; @@ -255,4 +256,7 @@ public Collection changelogPartitions() { return stateMgr.changelogPartitions(); } + public Map restoredOffsets() { + return stateMgr.changelogReader().restoredOffsets(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index cca6c3dc921f1..c198ecaec37ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; @@ -34,8 +35,8 @@ import java.util.concurrent.atomic.AtomicReference; class AssignedStreamsTasks extends AssignedTasks implements RestoringTasks { - private final Map suspended = new HashMap<>(); - private final Map restoring = new HashMap<>(); + private final Map suspended = new ConcurrentHashMap<>(); + private final Map restoring = new ConcurrentHashMap<>(); private final Set restoredPartitions = new HashSet<>(); private final Map restoringByPartition = new HashMap<>(); private final Set prevActiveTasks = new HashSet<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 125d3e315e456..4d0c07e533150 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -39,7 +39,7 @@ abstract class AssignedTasks { final Logger log; final String taskTypeName; - final Map created = new HashMap<>(); + final Map created = new ConcurrentHashMap<>(); // IQ may access this map. final Map running = new ConcurrentHashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 5754b4a9f1ec2..24347fc2c4125 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -102,6 +102,9 @@ public class InternalTopologyBuilder { // map from state store names to this state store's corresponding changelog topic if possible private final Map storeToChangelogTopic = new HashMap<>(); + // map from changelog topic name to its corresponding state store. + private final Map changelogTopicToStore = new HashMap<>(); + // all global topics private final Set globalTopics = new HashSet<>(); @@ -599,12 +602,21 @@ public final void connectProcessorAndStateStores(final String processorName, nodeGroups = null; } + public Map getStoreToChangelogTopic() { + return storeToChangelogTopic; + } + + public Map getChangelogTopicToStore() { + return changelogTopicToStore; + } + public void connectSourceStoreAndTopic(final String sourceStoreName, final String topic) { if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyException("Source store " + sourceStoreName + " is already added."); } storeToChangelogTopic.put(sourceStoreName, topic); + changelogTopicToStore.put(topic, sourceStoreName); } public final void addInternalTopic(final String topicName) { @@ -940,6 +952,7 @@ private void buildProcessorNode(final Map processorMap, if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) { final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName); storeToChangelogTopic.put(stateStoreName, changelogTopic); + changelogTopicToStore.put(changelogTopic, stateStoreName); } stateStoreMap.put(stateStoreName, stateStoreFactory.build()); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 9009a1c458fbf..6aa0a42335501 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.FixedOrderMap; @@ -73,7 +74,7 @@ public class ProcessorStateManager implements StateManager { private final boolean eosEnabled; private final File baseDir; private OffsetCheckpoint checkpointFile; - private final Map checkpointFileCache = new HashMap<>(); + private final Map checkpointFileCache = new ConcurrentHashMap<>(); private final Map initialLoadedCheckpoints; /** @@ -102,7 +103,7 @@ public ProcessorStateManager(final TaskId taskId, offsetLimits = new HashMap<>(); standbyRestoredOffsets = new HashMap<>(); this.isStandby = isStandby; - restoreCallbacks = isStandby ? new HashMap<>() : null; + restoreCallbacks = isStandby ? new ConcurrentHashMap<>() : null; recordConverters = isStandby ? new HashMap<>() : null; this.storeToChangelogTopic = new HashMap<>(storeToChangelogTopic); @@ -409,6 +410,10 @@ Collection changelogPartitions() { return unmodifiableList(changelogPartitions); } + ChangelogReader changelogReader() { + return changelogReader; + } + void ensureStoresRegistered() { for (final Map.Entry> entry : registeredStores.entrySet()) { if (!entry.getValue().isPresent()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 72ac76a97658b..4de3b5ea198c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -195,7 +195,7 @@ public List> update(final TopicPartition partitio return remainingRecords; } - Map checkpointedOffsets() { + public Map checkpointedOffsets() { return Collections.unmodifiableMap(stateMgr.checkpointed()); } @@ -212,7 +212,6 @@ private long updateOffsetLimits(final TopicPartition partition) { throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + "New limit: " + newlimit.getValue() + ". Previous limit: " + previousLimit); } - } offsetLimits.putAll(newLimits); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 669eabf8bb5fc..b8be4bc081e8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -46,7 +47,7 @@ public class StoreChangelogReader implements ChangelogReader { private final StateRestoreListener userStateRestoreListener; private final Map restoreToOffsets = new HashMap<>(); private final Map> partitionInfo = new HashMap<>(); - private final Map stateRestorers = new HashMap<>(); + private final Map stateRestorers = new ConcurrentHashMap<>(); private final Set needsRestoring = new HashSet<>(); private final Set needsInitializing = new HashSet<>(); private final Set completedRestorers = new HashSet<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5abdef1f451ed..23a411e770de2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -235,11 +235,6 @@ State setState(final State newState) { return oldState; } - public boolean isRunningAndNotRebalancing() { - // we do not need to grab stateLock since it is a single read - return state == State.RUNNING; - } - public boolean isRunning() { synchronized (stateLock) { return state.isRunning(); @@ -1195,6 +1190,22 @@ public Map tasks() { return taskManager.activeTasks(); } + public Map standbyTasks() { + return taskManager.standbyTasks(); + } + + public List allStreamsTasks() { + return taskManager.allStreamsTasks(); + } + + public List allStandbyTasks() { + return taskManager.allStandbyTasks(); + } + + public Set restoringTaskIds() { + return taskManager.restoringTaskIds(); + } + /** * Produces a string representation containing useful information about a StreamThread. * This is useful in debugging scenarios. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 27826a63ff78f..9222ffd1546e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.stream.Stream; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.HostInfo; @@ -48,7 +50,7 @@ public class StreamsMetadataState { private final Set globalStores; private final HostInfo thisHost; private Cluster clusterMetadata; - private StreamsMetadata myMetadata; + private StreamsMetadata localMetadata; public StreamsMetadataState(final InternalTopologyBuilder builder, final HostInfo thisHost) { this.builder = builder; @@ -71,6 +73,15 @@ public String toString(final String indent) { return builder.toString(); } + /** + * Get the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams application} + * + * @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application + */ + public synchronized StreamsMetadata getLocalMetadata() { + return localMetadata; + } + /** * Find all of the {@link StreamsMetadata}s in a * {@link KafkaStreams application} @@ -105,7 +116,7 @@ public synchronized Collection getAllMetadataForStore(final Str final ArrayList results = new ArrayList<>(); for (final StreamsMetadata metadata : allMetadata) { - if (metadata.stateStoreNames().contains(storeName)) { + if (metadata.stateStoreNames().contains(storeName) || metadata.standbyStateStoreNames().contains(storeName)) { results.add(metadata); } } @@ -116,17 +127,19 @@ public synchronized Collection getAllMetadataForStore(final Str * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)} - * * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which {@link StreamsMetadata} it would exist on. * + * * @param storeName Name of the store * @param key Key to use * @param keySerializer Serializer for the key * @param key type * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} * if streams is (re-)initializing + * @deprecated Use {@link #getKeyQueryMetadataForKey(String, Object, Serializer)} instead. */ + @Deprecated public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, final Serializer keySerializer) { @@ -144,7 +157,7 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam if (thisHost == UNKNOWN_HOST) { return allMetadata.get(0); } - return myMetadata; + return localMetadata; } final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); @@ -158,9 +171,69 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam sourceTopicsInfo); } + /** + * Find the {@link KeyQueryMetadata}s for a given storeName and key. This method will use the + * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used + * please use {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead. + * + * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, + * this method provides a way of finding which {@link KeyQueryMetadata} it would exist on. + * + * @param storeName Name of the store + * @param key Key to use + * @param keySerializer Serializer for the key + * @param key type + * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing or null if the corresponding topic cannot be found + */ + public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, + final K key, + final Serializer keySerializer) { + Objects.requireNonNull(keySerializer, "keySerializer can't be null"); + return getKeyQueryMetadataForKey(storeName, + key, + new DefaultStreamPartitioner<>(keySerializer, clusterMetadata)); + } + /** + * Find the {@link KeyQueryMetadata}s for a given storeName and key + * + * Note: the key may not exist in the {@link StateStore},this method provides a way of finding which + * {@link StreamsMetadata} it would exist on. + * + * @param storeName Name of the store + * @param key Key to use + * @param partitioner partitioner to use to find correct partition for key + * @param key type + * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE} + * if streams is (re-)initializing + */ + public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, + final K key, + final StreamPartitioner partitioner) { + Objects.requireNonNull(storeName, "storeName can't be null"); + Objects.requireNonNull(key, "key can't be null"); + Objects.requireNonNull(partitioner, "partitioner can't be null"); + if (!isInitialized()) { + return KeyQueryMetadata.NOT_AVAILABLE; + } + if (globalStores.contains(storeName)) { + // global stores are on every node. if we dont' have the host info + // for this host then just pick the first metadata + if (thisHost == UNKNOWN_HOST) { + return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1); + } + return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1); + } + + final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); + if (sourceTopicsInfo == null) { + return null; + } + return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo); + } /** * Find the {@link StreamsMetadata}s for a given storeName and key. @@ -174,7 +247,9 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam * @param key type * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} * if streams is (re-)initializing + * @deprecated Use {@link #getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead. */ + @Deprecated public synchronized StreamsMetadata getMetadataWithKey(final String storeName, final K key, final StreamPartitioner partitioner) { @@ -187,12 +262,12 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam } if (globalStores.contains(storeName)) { - // global stores are on every node. if we dont' have the host info + // global stores are on every node. if we don't have the host info // for this host then just pick the first metadata if (thisHost == UNKNOWN_HOST) { return allMetadata.get(0); } - return myMetadata; + return localMetadata; } final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); @@ -205,12 +280,16 @@ public synchronized StreamsMetadata getMetadataWithKey(final String storeNam /** * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the * metadata - * @param currentState the current mapping of {@link HostInfo} -> {@link TopicPartition}s - * @param clusterMetadata the current clusterMetadata {@link Cluster} + * + * @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions + * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions + * @param clusterMetadata the current clusterMetadata {@link Cluster} */ - synchronized void onChange(final Map> currentState, final Cluster clusterMetadata) { + synchronized void onChange(final Map> activePartitionHostMap, + final Map> standbyPartitionHostMap, + final Cluster clusterMetadata) { this.clusterMetadata = clusterMetadata; - rebuildMetadata(currentState); + rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap); } private boolean hasPartitionsForAnyTopics(final List topicNames, final Set partitionForHost) { @@ -222,31 +301,89 @@ private boolean hasPartitionsForAnyTopics(final List topicNames, final S return false; } - private void rebuildMetadata(final Map> currentState) { + private Set getStoresOnHost(final Map> storeToSourceTopics, final Set sourceTopicPartitions) { + final Set storesOnHost = new HashSet<>(); + for (final Map.Entry> storeTopicEntry : storeToSourceTopics.entrySet()) { + final List topicsForStore = storeTopicEntry.getValue(); + if (hasPartitionsForAnyTopics(topicsForStore, sourceTopicPartitions)) { + storesOnHost.add(storeTopicEntry.getKey()); + } + } + return storesOnHost; + } + + + private void rebuildMetadata(final Map> activePartitionHostMap, + final Map> standbyPartitionHostMap) { allMetadata.clear(); - if (currentState.isEmpty()) { + if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) { return; } - final Map> stores = builder.stateStoreNameToSourceTopics(); - for (final Map.Entry> entry : currentState.entrySet()) { - final HostInfo key = entry.getKey(); - final Set partitionsForHost = new HashSet<>(entry.getValue()); - final Set storesOnHost = new HashSet<>(); - for (final Map.Entry> storeTopicEntry : stores.entrySet()) { - final List topicsForStore = storeTopicEntry.getValue(); - if (hasPartitionsForAnyTopics(topicsForStore, partitionsForHost)) { - storesOnHost.add(storeTopicEntry.getKey()); + final Map> storeToSourceTopics = builder.stateStoreNameToSourceTopics(); + Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) + .distinct() + .forEach(hostInfo -> { + final Set activePartitionsOnHost = new HashSet<>(); + final Set activeStoresOnHost = new HashSet<>(); + if (activePartitionHostMap.containsKey(hostInfo)) { + activePartitionsOnHost.addAll(activePartitionHostMap.get(hostInfo)); + activeStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, activePartitionsOnHost)); + } + activeStoresOnHost.addAll(globalStores); + + final Set standbyPartitionsOnHost = new HashSet<>(); + final Set standbyStoresOnHost = new HashSet<>(); + if (standbyPartitionHostMap.containsKey(hostInfo)) { + standbyPartitionsOnHost.addAll(standbyPartitionHostMap.get(hostInfo)); + standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost)); + } + + final StreamsMetadata metadata = new StreamsMetadata(hostInfo, + activeStoresOnHost, + activePartitionsOnHost, + standbyStoresOnHost, + standbyPartitionsOnHost); + allMetadata.add(metadata); + if (hostInfo.equals(thisHost)) { + localMetadata = metadata; } + }); + } + + private KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, + final K key, + final StreamPartitioner partitioner, + final SourceTopicsInfo sourceTopicsInfo) { + + final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions); + final Set matchingPartitions = new HashSet<>(); + for (final String sourceTopic : sourceTopicsInfo.sourceTopics) { + matchingPartitions.add(new TopicPartition(sourceTopic, partition)); + } + + HostInfo activeHost = new HostInfo("unavailable", -1); + final Set standbyHosts = new HashSet<>(); + for (final StreamsMetadata streamsMetadata : allMetadata) { + final Set activeStateStoreNames = streamsMetadata.stateStoreNames(); + final Set topicPartitions = new HashSet<>(streamsMetadata.topicPartitions()); + final Set standbyStateStoreNames = streamsMetadata.standbyStateStoreNames(); + final Set standbyTopicPartitions = new HashSet<>(streamsMetadata.standbyTopicPartitions()); + + topicPartitions.retainAll(matchingPartitions); + if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) { + activeHost = streamsMetadata.hostInfo(); } - storesOnHost.addAll(globalStores); - final StreamsMetadata metadata = new StreamsMetadata(key, storesOnHost, partitionsForHost); - allMetadata.add(metadata); - if (key.equals(thisHost)) { - myMetadata = metadata; + + standbyTopicPartitions.retainAll(matchingPartitions); + if (standbyStateStoreNames.contains(storeName) && !standbyTopicPartitions.isEmpty()) { + standbyHosts.add(streamsMetadata.hostInfo()); } } + + return new KeyQueryMetadata(activeHost, standbyHosts, partition); } + @Deprecated private StreamsMetadata getStreamsMetadataForKey(final String storeName, final K key, final StreamPartitioner partitioner, @@ -278,6 +415,10 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { return new SourceTopicsInfo(sourceTopics); } + public String getStoreForChangelogTopic(final String topicName) { + return builder.getChangelogTopicToStore().get(topicName); + } + private boolean isInitialized() { return clusterMetadata != null && !clusterMetadata.topics().isEmpty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 6cb20536f1328..47edd3165a812 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -283,6 +283,7 @@ private Map errorAssignment(final Map Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), errorCode).encode() )); } @@ -620,7 +621,8 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // ---------------- Step Three ---------------- // // construct the global partition assignment per host map - final Map> partitionsByHostState = new HashMap<>(); + final Map> partitionsByHost = new HashMap<>(); + final Map> standbyPartitionsByHost = new HashMap<>(); if (minReceivedMetadataVersion >= 2) { for (final Map.Entry entry : clientMetadataMap.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -628,24 +630,31 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // if application server is configured, also include host state map if (hostInfo != null) { final Set topicPartitions = new HashSet<>(); + final Set standbyPartitions = new HashSet<>(); final ClientState state = entry.getValue().state; for (final TaskId id : state.activeTasks()) { topicPartitions.addAll(partitionsForTask.get(id)); } - partitionsByHostState.put(hostInfo, topicPartitions); + for (final TaskId id : state.standbyTasks()) { + standbyPartitions.addAll(partitionsForTask.get(id)); + } + + partitionsByHost.put(hostInfo, topicPartitions); + standbyPartitionsByHost.put(hostInfo, standbyPartitions); } } } - taskManager.setPartitionsByHostState(partitionsByHostState); + taskManager.setHostPartitionMappings(partitionsByHost, standbyPartitionsByHost); final Map assignment; if (versionProbing) { assignment = versionProbingAssignment( clientMetadataMap, partitionsForTask, - partitionsByHostState, + partitionsByHost, + standbyPartitionsByHost, allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion @@ -654,7 +663,8 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr assignment = computeNewAssignment( clientMetadataMap, partitionsForTask, - partitionsByHostState, + partitionsByHost, + standbyPartitionsByHost, allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion @@ -667,6 +677,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr private Map computeNewAssignment(final Map clientsMetadata, final Map> partitionsForTask, final Map> partitionsByHostState, + final Map> standbyPartitionsByHost, final Set allOwnedPartitions, final int minUserMetadataVersion, final int minSupportedMetadataVersion) { @@ -700,6 +711,7 @@ private Map computeNewAssignment(final Map computeNewAssignment(final Map versionProbingAssignment(final Map clientsMetadata, final Map> partitionsForTask, - final Map> partitionsByHostState, + final Map> partitionsByHost, + final Map> standbyPartitionsByHost, final Set allOwnedPartitions, final int minUserMetadataVersion, final int minSupportedMetadataVersion) { @@ -734,7 +747,8 @@ private Map versionProbingAssignment(final Map assignment, final ClientMetadata clientMetadata, final Map> partitionsForTask, final Map> partitionsByHostState, + final Map> standbyPartitionsByHost, final Set allOwnedPartitions, final Map> activeTaskAssignments, final Map> standbyTaskAssignments, @@ -785,6 +800,7 @@ private void addClientAssignments(final Map assignment, assignedActiveList, standbyTaskMap, partitionsByHostState, + standbyPartitionsByHost, AssignorError.NONE.code() ).encode() ) @@ -1113,6 +1129,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat // version 2 fields final Map topicToPartitionInfo = new HashMap<>(); final Map> partitionsByHost; + final Map> standbyPartitionsByHost; final Map partitionsToTaskId = new HashMap<>(); @@ -1120,6 +1137,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat case 1: processVersionOneAssignment(logPrefix, info, partitions, activeTasks, partitionsToTaskId); partitionsByHost = Collections.emptyMap(); + standbyPartitionsByHost = Collections.emptyMap(); break; case 2: case 3: @@ -1127,6 +1145,12 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat case 5: processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId); partitionsByHost = info.partitionsByHost(); + standbyPartitionsByHost = Collections.emptyMap(); + break; + case 6: + processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId); + partitionsByHost = info.partitionsByHost(); + standbyPartitionsByHost = info.standbyPartitionByHost(); break; default: throw new IllegalStateException( @@ -1136,7 +1160,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat } taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); - taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setHostPartitionMappings(partitionsByHost, standbyPartitionsByHost); taskManager.setPartitionsToTaskId(partitionsToTaskId); taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); taskManager.updateSubscriptionsFromAssignment(partitions); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 43284c00a286c..8beeafa426140 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -360,6 +360,18 @@ Map standbyTasks() { return standby.runningTaskMap(); } + List allStreamsTasks() { + return active.allTasks(); + } + + Set restoringTaskIds() { + return active.restoringTaskIds(); + } + + List allStandbyTasks() { + return standby.allTasks(); + } + void setConsumer(final Consumer consumer) { this.consumer = consumer; } @@ -440,8 +452,9 @@ public void setClusterMetadata(final Cluster cluster) { this.cluster = cluster; } - public void setPartitionsByHostState(final Map> partitionsByHostState) { - this.streamsMetadataState.onChange(partitionsByHostState, cluster); + public void setHostPartitionMappings(final Map> partitionsByHost, + final Map> standbyPartitionsByHost) { + this.streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, cluster); } public void setPartitionsToTaskId(final Map partitionsToTaskId) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index a9ce225ef477e..f7d8541c0326b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -30,12 +30,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; @@ -49,24 +52,27 @@ public class AssignmentInfo { private List activeTasks; private Map> standbyTasks; private Map> partitionsByHost; + private Map> standbyPartitionsByHost; // used for decoding and "future consumer" assignments during version probing public AssignmentInfo(final int version, final int commonlySupportedVersion) { this(version, - commonlySupportedVersion, - Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - 0); + commonlySupportedVersion, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + 0); } public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, final Map> partitionsByHost, + final Map> standbyPartitionsByHost, final int errCode) { - this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode); + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, standbyPartitionsByHost, errCode); } public AssignmentInfo(final int version, @@ -74,12 +80,14 @@ public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, final Map> partitionsByHost, + final Map> standbyPartitionsByHost, final int errCode) { this.usedVersion = version; this.commonlySupportedVersion = commonlySupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = partitionsByHost; + this.standbyPartitionsByHost = standbyPartitionsByHost; this.errCode = errCode; if (version < 1 || version > LATEST_SUPPORTED_VERSION) { @@ -112,6 +120,10 @@ public Map> partitionsByHost() { return partitionsByHost; } + public Map> standbyPartitionByHost() { + return standbyPartitionsByHost; + } + /** * @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an * IO exception during encoding @@ -150,9 +162,16 @@ public ByteBuffer encode() { encodePartitionsByHostAsDictionary(out); out.writeInt(errCode); break; + case 6: + out.writeInt(usedVersion); + out.writeInt(commonlySupportedVersion); + encodeActiveAndStandbyTaskAssignment(out); + encodeActiveAndStandbyHostPartitions(out); + out.writeInt(errCode); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion - + "; latest commonly supported version: " + commonlySupportedVersion); + + "; latest commonly supported version: " + commonlySupportedVersion); } out.flush(); @@ -191,16 +210,31 @@ private void encodePartitionsByHost(final DataOutputStream out) throws IOExcepti } } - private void encodePartitionsByHostAsDictionary(final DataOutputStream out) throws IOException { + private void encodeHostPartitionMapUsingDictionary(final DataOutputStream out, + final Map topicNameDict, + final Map> hostPartitionMap) throws IOException { + // encode partitions by host + out.writeInt(hostPartitionMap.size()); + // Write the topic index, partition + for (final Map.Entry> entry : hostPartitionMap.entrySet()) { + writeHostInfo(out, entry.getKey()); + out.writeInt(entry.getValue().size()); + for (final TopicPartition partition : entry.getValue()) { + out.writeInt(topicNameDict.get(partition.topic())); + out.writeInt(partition.partition()); + } + } + } + + private Map encodeTopicDictionaryAndGet(final DataOutputStream out, + final Set topicPartitions) throws IOException { // Build a dictionary to encode topicNames int topicIndex = 0; final Map topicNameDict = new HashMap<>(); - for (final Map.Entry> entry : partitionsByHost.entrySet()) { - for (final TopicPartition topicPartition : entry.getValue()) { - if (!topicNameDict.containsKey(topicPartition.topic())) { - topicNameDict.put(topicPartition.topic(), topicIndex++); - } + for (final TopicPartition topicPartition : topicPartitions) { + if (!topicNameDict.containsKey(topicPartition.topic())) { + topicNameDict.put(topicPartition.topic(), topicIndex++); } } @@ -211,18 +245,23 @@ private void encodePartitionsByHostAsDictionary(final DataOutputStream out) thro out.writeUTF(entry.getKey()); } - // encode partitions by host - out.writeInt(partitionsByHost.size()); + return topicNameDict; + } - // Write the topic index, partition - for (final Map.Entry> entry : partitionsByHost.entrySet()) { - writeHostInfo(out, entry.getKey()); - out.writeInt(entry.getValue().size()); - for (final TopicPartition partition : entry.getValue()) { - out.writeInt(topicNameDict.get(partition.topic())); - out.writeInt(partition.partition()); - } - } + private void encodePartitionsByHostAsDictionary(final DataOutputStream out) throws IOException { + final Set allTopicPartitions = partitionsByHost.values().stream() + .flatMap(Collection::stream).collect(Collectors.toSet()); + final Map topicNameDict = encodeTopicDictionaryAndGet(out, allTopicPartitions); + encodeHostPartitionMapUsingDictionary(out, topicNameDict, partitionsByHost); + } + + private void encodeActiveAndStandbyHostPartitions(final DataOutputStream out) throws IOException { + final Set allTopicPartitions = Stream + .concat(partitionsByHost.values().stream(), standbyPartitionsByHost.values().stream()) + .flatMap(Collection::stream).collect(Collectors.toSet()); + final Map topicNameDict = encodeTopicDictionaryAndGet(out, allTopicPartitions); + encodeHostPartitionMapUsingDictionary(out, topicNameDict, partitionsByHost); + encodeHostPartitionMapUsingDictionary(out, topicNameDict, standbyPartitionsByHost); } private void writeHostInfo(final DataOutputStream out, final HostInfo hostInfo) throws IOException { @@ -287,6 +326,14 @@ public static AssignmentInfo decode(final ByteBuffer data) { decodePartitionsByHostUsingDictionary(assignmentInfo, in); assignmentInfo.errCode = in.readInt(); break; + case 6: + commonlySupportedVersion = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion); + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodeActiveAndStandbyHostPartitions(assignmentInfo, in); + assignmentInfo.errCode = in.readInt(); + break; default: final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -320,7 +367,7 @@ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo, } private static void decodePartitionsByHost(final AssignmentInfo assignmentInfo, - final DataInputStream in) throws IOException { + final DataInputStream in) throws IOException { assignmentInfo.partitionsByHost = new HashMap<>(); final int numEntries = in.readInt(); for (int i = 0; i < numEntries; i++) { @@ -338,24 +385,41 @@ private static Set readTopicPartitions(final DataInputStream in) return partitions; } - private static void decodePartitionsByHostUsingDictionary(final AssignmentInfo assignmentInfo, - final DataInputStream in) throws IOException { - assignmentInfo.partitionsByHost = new HashMap<>(); + private static Map decodeTopicIndexAndGet(final DataInputStream in) throws IOException { final int dictSize = in.readInt(); final Map topicIndexDict = new HashMap<>(dictSize); for (int i = 0; i < dictSize; i++) { topicIndexDict.put(in.readInt(), in.readUTF()); } + return topicIndexDict; + } + private static Map> decodeHostPartitionMapUsingDictionary(final DataInputStream in, + final Map topicIndexDict) throws IOException { + final Map> hostPartitionMap = new HashMap<>(); final int numEntries = in.readInt(); for (int i = 0; i < numEntries; i++) { final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); - assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in, topicIndexDict)); + hostPartitionMap.put(hostInfo, readTopicPartitions(in, topicIndexDict)); } + return hostPartitionMap; + } + + private static void decodePartitionsByHostUsingDictionary(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final Map topicIndexDict = decodeTopicIndexAndGet(in); + assignmentInfo.partitionsByHost = decodeHostPartitionMapUsingDictionary(in, topicIndexDict); + } + + private static void decodeActiveAndStandbyHostPartitions(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final Map topicIndexDict = decodeTopicIndexAndGet(in); + assignmentInfo.partitionsByHost = decodeHostPartitionMapUsingDictionary(in, topicIndexDict); + assignmentInfo.standbyPartitionsByHost = decodeHostPartitionMapUsingDictionary(in, topicIndexDict); } private static Set readTopicPartitions(final DataInputStream in, - final Map topicIndexDict) throws IOException { + final Map topicIndexDict) throws IOException { final int numPartitions = in.readInt(); final Set partitions = new HashSet<>(numPartitions); for (int j = 0; j < numPartitions; j++) { @@ -366,8 +430,9 @@ private static Set readTopicPartitions(final DataInputStream in, @Override public int hashCode() { + final int hostMapHashCode = partitionsByHost.hashCode() ^ standbyPartitionsByHost.hashCode(); return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() - ^ partitionsByHost.hashCode() ^ errCode; + ^ hostMapHashCode ^ errCode; } @Override @@ -375,11 +440,12 @@ public boolean equals(final Object o) { if (o instanceof AssignmentInfo) { final AssignmentInfo other = (AssignmentInfo) o; return usedVersion == other.usedVersion && - commonlySupportedVersion == other.commonlySupportedVersion && - errCode == other.errCode && - activeTasks.equals(other.activeTasks) && - standbyTasks.equals(other.standbyTasks) && - partitionsByHost.equals(other.partitionsByHost); + commonlySupportedVersion == other.commonlySupportedVersion && + errCode == other.errCode && + activeTasks.equals(other.activeTasks) && + standbyTasks.equals(other.standbyTasks) && + partitionsByHost.equals(other.partitionsByHost) && + standbyPartitionsByHost.equals(other.standbyPartitionsByHost); } else { return false; } @@ -391,6 +457,8 @@ public String toString() { + ", supported version=" + commonlySupportedVersion + ", active tasks=" + activeTasks + ", standby tasks=" + standbyTasks - + ", partitions by host=" + partitionsByHost + "]"; + + ", partitions by host=" + partitionsByHost + + ", standbyPartitions by host=" + standbyPartitionsByHost + + "]"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java index ee9b7502cce30..f091907361644 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java @@ -19,7 +19,7 @@ public final class StreamsAssignmentProtocolVersions { public static final int UNKNOWN = -1; public static final int EARLIEST_PROBEABLE_VERSION = 3; - public static final int LATEST_SUPPORTED_VERSION = 5; + public static final int LATEST_SUPPORTED_VERSION = 6; private StreamsAssignmentProtocolVersions() {} } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java index 16af13e54bc2a..50c2d6837fd2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state; +import java.util.Objects; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KafkaStreams; @@ -35,34 +36,80 @@ public class StreamsMetadata { * operations. */ public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable", -1), + Collections.emptySet(), + Collections.emptySet(), Collections.emptySet(), Collections.emptySet()); private final HostInfo hostInfo; + private final Set stateStoreNames; + private final Set topicPartitions; + private final Set standbyStateStoreNames; + + private final Set standbyTopicPartitions; + public StreamsMetadata(final HostInfo hostInfo, final Set stateStoreNames, - final Set topicPartitions) { + final Set topicPartitions, + final Set standbyStateStoreNames, + final Set standbyTopicPartitions) { this.hostInfo = hostInfo; this.stateStoreNames = stateStoreNames; this.topicPartitions = topicPartitions; + this.standbyTopicPartitions = standbyTopicPartitions; + this.standbyStateStoreNames = standbyStateStoreNames; } + /** + * The value of {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams + * instance, which is typically host/port + * + * @return {@link HostInfo} corresponding to the streams instance + */ public HostInfo hostInfo() { return hostInfo; } + /** + * State stores owned by the instance as an active replica + * + * @return set of active state store names + */ public Set stateStoreNames() { return stateStoreNames; } + /** + * Topic partitions consumed by the instance as an active replica + * + * @return set of active topic partitions + */ public Set topicPartitions() { return topicPartitions; } + /** + * (Source) Topic partitions for which the instance acts as standby. + * + * @return set of standby topic partitions + */ + public Set standbyTopicPartitions() { + return standbyTopicPartitions; + } + + /** + * State stores owned by the instance as a standby replica + * + * @return set of standby state store names + */ + public Set standbyStateStoreNames() { + return standbyStateStoreNames; + } + public String host() { return hostInfo.host(); } @@ -80,31 +127,28 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final StreamsMetadata that = (StreamsMetadata) o; - if (!hostInfo.equals(that.hostInfo)) { - return false; - } - if (!stateStoreNames.equals(that.stateStoreNames)) { - return false; - } - return topicPartitions.equals(that.topicPartitions); + final StreamsMetadata that = (StreamsMetadata) o; + return Objects.equals(hostInfo, that.hostInfo) + && Objects.equals(stateStoreNames, that.stateStoreNames) + && Objects.equals(topicPartitions, that.topicPartitions) + && Objects.equals(standbyStateStoreNames, that.standbyStateStoreNames) + && Objects.equals(standbyTopicPartitions, that.standbyTopicPartitions); } @Override public int hashCode() { - int result = hostInfo.hashCode(); - result = 31 * result + stateStoreNames.hashCode(); - result = 31 * result + topicPartitions.hashCode(); - return result; + return Objects.hash(hostInfo, stateStoreNames, topicPartitions, standbyStateStoreNames, standbyTopicPartitions); } @Override public String toString() { - return "StreamsMetadata{" + + return "StreamsMetadata {" + "hostInfo=" + hostInfo + ", stateStoreNames=" + stateStoreNames + ", topicPartitions=" + topicPartitions + + ", standbyStateStoreNames=" + standbyStateStoreNames + + ", standbyTopicPartitions=" + standbyTopicPartitions + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 53c5cc0210672..7c2b472b4fcc7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.HashSet; +import java.util.Set; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -46,12 +48,18 @@ public List stores(final String storeName, final QueryableStoreType qu if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } - if (!streamThread.isRunningAndNotRebalancing()) { + if (!streamThread.isRunning()) { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + - streamThread.state() + ", not RUNNING"); + streamThread.state() + ", not RUNNING or REBALANCING"); } final List stores = new ArrayList<>(); - for (final Task streamTask : streamThread.tasks().values()) { + final Set tasks = new HashSet<>(streamThread.tasks().values()); + + if (streamThread.standbyTasks() != null) { + tasks.addAll(streamThread.standbyTasks().values()); + } + + for (final Task streamTask : tasks) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { diff --git a/streams/src/main/resources/common/message/SubscriptionInfo.json b/streams/src/main/resources/common/message/SubscriptionInfo.json index ff18f86459399..09e95feeb58f1 100644 --- a/streams/src/main/resources/common/message/SubscriptionInfo.json +++ b/streams/src/main/resources/common/message/SubscriptionInfo.json @@ -15,7 +15,7 @@ { "name": "SubscriptionInfo", - "validVersions": "1-5", + "validVersions": "1-6", "fields": [ { "name": "version", diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index cd24685165a07..d34a09d921bfc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -17,9 +17,14 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; @@ -313,6 +318,10 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin Thread.sleep(50L); return null; }).anyTimes(); + + EasyMock.expect(thread.allStandbyTasks()).andStubReturn(Collections.emptyList()); + EasyMock.expect(thread.restoringTaskIds()).andStubReturn(Collections.emptySet()); + EasyMock.expect(thread.allStreamsTasks()).andStubReturn(Collections.emptyList()); } @Test @@ -659,15 +668,43 @@ public void shouldNotGetAllTasksWithStoreWhenNotRunning() { } @Test(expected = IllegalStateException.class) - public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() { + public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() { + final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); + streams.queryMetadataForKey("store", "key", Serdes.String().serializer()); + } + + @Test + public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() { final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); - streams.metadataForKey("store", "key", Serdes.String().serializer()); + streams.start(); + assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", Serdes.String().serializer())); } @Test(expected = IllegalStateException.class) - public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() { + public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() { final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); - streams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0); + streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0); + } + + @Test + public void shouldReturnEmptyLocalStorePartitionLags() { + // Mock all calls made to compute the offset lags, + final ListOffsetsResult result = EasyMock.mock(ListOffsetsResult.class); + final KafkaFutureImpl> allFuture = new KafkaFutureImpl<>(); + allFuture.complete(Collections.emptyMap()); + + EasyMock.expect(result.all()).andReturn(allFuture); + final MockAdminClient mockAdminClient = EasyMock.partialMockBuilder(MockAdminClient.class) + .addMockedMethod("listOffsets", Map.class).createMock(); + EasyMock.expect(mockAdminClient.listOffsets(anyObject())).andStubReturn(result); + final MockClientSupplier mockClientSupplier = EasyMock.partialMockBuilder(MockClientSupplier.class) + .addMockedMethod("getAdmin").createMock(); + EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient); + EasyMock.replay(result, mockAdminClient, mockClientSupplier); + + final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, mockClientSupplier, time); + streams.start(); + assertEquals(0, streams.allLocalStorePartitionLags().size()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java new file mode 100644 index 0000000000000..9afd6fcb32110 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreamsWrapper; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +@Category({IntegrationTest.class}) +public class LagFetchIntegrationTest { + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final long CONSUMER_TIMEOUT_MS = 60000; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private final MockTime mockTime = CLUSTER.time; + private Properties streamsConfiguration; + private Properties consumerConfiguration; + private String inputTopicName; + private String outputTopicName; + private String stateStoreName; + + @Rule + public TestName name = new TestName(); + + @Before + public void before() { + inputTopicName = "input-topic-" + name.getMethodName(); + outputTopicName = "output-topic-" + name.getMethodName(); + stateStoreName = "lagfetch-test-store" + name.getMethodName(); + + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + + consumerConfiguration = new Properties(); + consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer"); + consumerConfiguration.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfiguration.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + } + + @After + public void shutdown() throws Exception { + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldBeAbleToFetchValidLagsDuringRebalancing() throws Exception { + final CountDownLatch latchTillActiveIsRunning = new CountDownLatch(1); + final CountDownLatch latchTillStandbyIsRunning = new CountDownLatch(1); + final CountDownLatch latchTillStandbyHasPartitionsAssigned = new CountDownLatch(1); + final CyclicBarrier lagCheckBarrier = new CyclicBarrier(2); + final List streamsList = new ArrayList<>(); + + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopicName, + mkSet(new KeyValue<>("k1", 1L), new KeyValue<>("k2", 2L), new KeyValue<>("k3", 3L), new KeyValue<>("k4", 4L), new KeyValue<>("k5", 5L)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + LongSerializer.class, + new Properties()), + mockTime); + + // create stream threads + for (int i = 0; i < 2; i++) { + final Properties props = (Properties) streamsConfiguration.clone(); + final File stateDir = folder.newFolder("state-" + i); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getAbsolutePath()); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable t1 = builder.table(inputTopicName, Materialized.as(stateStoreName)); + t1.toStream().to(outputTopicName); + final KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(), props); + streamsList.add(streams); + } + + final KafkaStreamsWrapper activeStreams = streamsList.get(0); + final KafkaStreamsWrapper standbyStreams = streamsList.get(1); + activeStreams.setStreamThreadStateListener((thread, newState, oldState) -> { + if (newState == StreamThread.State.RUNNING) { + latchTillActiveIsRunning.countDown(); + } + }); + standbyStreams.setStreamThreadStateListener((thread, newState, oldState) -> { + if (oldState == StreamThread.State.PARTITIONS_ASSIGNED && newState == StreamThread.State.RUNNING) { + latchTillStandbyHasPartitionsAssigned.countDown(); + try { + lagCheckBarrier.await(60, TimeUnit.SECONDS); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } else if (newState == StreamThread.State.RUNNING) { + latchTillStandbyIsRunning.countDown(); + } + }); + + try { + // First start up the active. + Map> offsetLagInfoMap = activeStreams.allLocalStorePartitionLags(); + assertThat(offsetLagInfoMap.size(), equalTo(0)); + activeStreams.start(); + latchTillActiveIsRunning.await(60, TimeUnit.SECONDS); + + IntegrationTestUtils.waitUntilMinValuesRecordsReceived( + consumerConfiguration, + outputTopicName, + 5, + CONSUMER_TIMEOUT_MS); + // Check the active reports proper lag values. + offsetLagInfoMap = activeStreams.allLocalStorePartitionLags(); + assertThat(offsetLagInfoMap.size(), equalTo(1)); + assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName))); + assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1)); + LagInfo lagInfo = offsetLagInfoMap.get(stateStoreName).get(0); + assertThat(lagInfo.currentOffsetPosition(), equalTo(5L)); + assertThat(lagInfo.endOffsetPosition(), equalTo(5L)); + assertThat(lagInfo.offsetLag(), equalTo(0L)); + + // start up the standby & make it pause right after it has partition assigned + standbyStreams.start(); + latchTillStandbyHasPartitionsAssigned.await(60, TimeUnit.SECONDS); + offsetLagInfoMap = standbyStreams.allLocalStorePartitionLags(); + assertThat(offsetLagInfoMap.size(), equalTo(1)); + assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName))); + assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1)); + lagInfo = offsetLagInfoMap.get(stateStoreName).get(0); + assertThat(lagInfo.currentOffsetPosition(), equalTo(0L)); + assertThat(lagInfo.endOffsetPosition(), equalTo(5L)); + assertThat(lagInfo.offsetLag(), equalTo(5L)); + // standby thread wont proceed to RUNNING before this barrier is crossed + lagCheckBarrier.await(60, TimeUnit.SECONDS); + + // wait till the lag goes down to 0, on the standby + TestUtils.waitForCondition(() -> standbyStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0, + "Standby should eventually catchup and have zero lag."); + } finally { + for (final KafkaStreams streams : streamsList) { + streams.close(); + } + } + } + + @Test + public void shouldBeAbleToFetchValidLagsDuringRestoration() throws Exception { + IntegrationTestUtils.produceKeyValuesSynchronously( + inputTopicName, + mkSet(new KeyValue<>("k1", 1L), new KeyValue<>("k2", 2L), new KeyValue<>("k3", 3L), new KeyValue<>("k4", 4L), new KeyValue<>("k5", 5L)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + LongSerializer.class, + new Properties()), + mockTime); + + // create stream threads + final Properties props = (Properties) streamsConfiguration.clone(); + final File stateDir = folder.newFolder("state"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:0"); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-0"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getAbsolutePath()); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable t1 = builder.table(inputTopicName, Materialized.as(stateStoreName)); + t1.toStream().to(outputTopicName); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + try { + // First start up the active. + Map> offsetLagInfoMap = streams.allLocalStorePartitionLags(); + assertThat(offsetLagInfoMap.size(), equalTo(0)); + + // Get the instance to fully catch up and reach RUNNING state + startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(60)); + IntegrationTestUtils.waitUntilMinValuesRecordsReceived( + consumerConfiguration, + outputTopicName, + 5, + CONSUMER_TIMEOUT_MS); + + // check for proper lag values. + offsetLagInfoMap = streams.allLocalStorePartitionLags(); + assertThat(offsetLagInfoMap.size(), equalTo(1)); + assertThat(offsetLagInfoMap.keySet(), equalTo(mkSet(stateStoreName))); + assertThat(offsetLagInfoMap.get(stateStoreName).size(), equalTo(1)); + final LagInfo zeroLagInfo = offsetLagInfoMap.get(stateStoreName).get(0); + assertThat(zeroLagInfo.currentOffsetPosition(), equalTo(5L)); + assertThat(zeroLagInfo.endOffsetPosition(), equalTo(5L)); + assertThat(zeroLagInfo.offsetLag(), equalTo(0L)); + + // Kill instance, delete state to force restoration. + assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(f -> assertTrue("Some state " + f + " could not be deleted", f.delete())); + + // wait till the lag goes down to 0 + final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props); + // set a state restoration listener to track progress of restoration + final Map> restoreStartLagInfo = new HashMap<>(); + final Map> restoreEndLagInfo = new HashMap<>(); + restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { + restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags()); + } + + @Override + public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { + } + + @Override + public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { + restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags()); + } + }); + + restartedStreams.start(); + TestUtils.waitForCondition(() -> restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0, + "Standby should eventually catchup and have zero lag."); + final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0); + assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L)); + assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L)); + assertThat(fullLagInfo.offsetLag(), equalTo(5L)); + + assertThat(zeroLagInfo, equalTo(restoreEndLagInfo.get(stateStoreName).get(0))); + } finally { + streams.close(); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index a53f57dd63aed..e2ef3f13f1531 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -49,6 +49,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -64,7 +65,7 @@ @Category(IntegrationTest.class) public class OptimizedKTableIntegrationTest { private static final int NUM_BROKERS = 1; - + private static int port = 0; private static final String INPUT_TOPIC_NAME = "input-topic"; private static final String TABLE_NAME = "source-table"; @@ -157,7 +158,9 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { .store(TABLE_NAME, QueryableStoreTypes.keyValueStore()); final boolean kafkaStreams1WasFirstActive; - if (store1.get(key) != null) { + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + + if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) { kafkaStreams1WasFirstActive = true; } else { // Assert that data from the job was sent to the store @@ -254,6 +257,7 @@ private Properties streamsConfiguration() { final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + String.valueOf(++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 70f646f882bc4..bd6d08fe1cdb5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -16,13 +16,46 @@ */ package org.apache.kafka.streams.integration; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; +import static java.time.Instant.ofEpochMilli; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; +import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.io.PrintStream; import java.io.StringReader; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; @@ -38,7 +71,9 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreamsTest; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.LagInfo; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -68,44 +103,13 @@ import org.junit.After; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.time.Duration.ofMillis; -import static java.time.Duration.ofSeconds; -import static java.time.Instant.ofEpochMilli; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; -import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; - @Category({IntegrationTest.class}) public class QueryableStateIntegrationTest { private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class); @@ -114,6 +118,9 @@ public class QueryableStateIntegrationTest { private static final int NUM_BROKERS = 1; + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final int STREAM_THREE_PARTITIONS = 4; @@ -256,29 +263,49 @@ private KafkaStreams createCountStream(final String inputTopic, return new KafkaStreams(builder.build(), streamsConfiguration); } + private void verifyOffsetLagFetch(final List streamsList, + final Set stores, + final List partitionsPerStreamsInstance) { + for (int i = 0; i < streamsList.size(); i++) { + final Map> localLags = streamsList.get(i).allLocalStorePartitionLags(); + final int expectedPartitions = partitionsPerStreamsInstance.get(i); + assertThat(localLags.values().stream().mapToInt(Map::size).sum(), equalTo(expectedPartitions)); + if (expectedPartitions > 0) { + assertThat(localLags.keySet(), equalTo(stores)); + } + } + } + + @SuppressWarnings("deprecation") private void verifyAllKVKeys(final List streamsList, final KafkaStreams streams, final KafkaStreamsTest.StateListenerStub stateListener, final Set keys, final String storeName, - final long timeout) throws Exception { + final long timeout, + final boolean pickInstanceByPort) throws Exception { retryOnExceptionWithTimeout(timeout, () -> { final List noMetadataKeys = new ArrayList<>(); final List nullStoreKeys = new ArrayList<>(); final List nullValueKeys = new ArrayList<>(); final Map exceptionalKeys = new TreeMap<>(); + final StringSerializer serializer = new StringSerializer(); for (final String key: keys) { try { - final StreamsMetadata metadata = streams - .metadataForKey(storeName, key, new StringSerializer()); - if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + final KeyQueryMetadata queryMetadata = streams.queryMetadataForKey(storeName, key, serializer); + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, serializer); + if (queryMetadata == null || queryMetadata.equals(KeyQueryMetadata.NOT_AVAILABLE)) { noMetadataKeys.add(key); continue; } + assertThat(metadata.hostInfo(), equalTo(queryMetadata.getActiveHost())); + if (!pickInstanceByPort) { + assertThat("Should have standbys to query from", queryMetadata.getStandbyHosts().size() > 0); + } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamsList.get(index); + final int index = queryMetadata.getActiveHost().port(); + final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; final ReadOnlyKeyValueStore store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); if (store == null) { @@ -288,7 +315,6 @@ private void verifyAllKVKeys(final List streamsList, if (store.get(key) == null) { nullValueKeys.add(key); - continue; } } catch (final InvalidStateStoreException e) { if (stateListener.mapStates.get(KafkaStreams.State.REBALANCING) < 1) { @@ -305,6 +331,7 @@ private void verifyAllKVKeys(final List streamsList, }); } + @SuppressWarnings("deprecation") private void verifyAllWindowedKeys(final List streamsList, final KafkaStreams streams, final KafkaStreamsTest.StateListenerStub stateListenerStub, @@ -312,24 +339,32 @@ private void verifyAllWindowedKeys(final List streamsList, final String storeName, final Long from, final Long to, - final long timeout) throws Exception { + final long timeout, + final boolean pickInstanceByPort) throws Exception { retryOnExceptionWithTimeout(timeout, () -> { final List noMetadataKeys = new ArrayList<>(); final List nullStoreKeys = new ArrayList<>(); final List nullValueKeys = new ArrayList<>(); final Map exceptionalKeys = new TreeMap<>(); + final StringSerializer serializer = new StringSerializer(); for (final String key: keys) { try { - final StreamsMetadata metadata = streams - .metadataForKey(storeName, key, new StringSerializer()); - if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) { + final KeyQueryMetadata queryMetadata = streams.queryMetadataForKey(storeName, key, serializer); + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, serializer); + if (queryMetadata == null || queryMetadata.equals(KeyQueryMetadata.NOT_AVAILABLE)) { noMetadataKeys.add(key); continue; } + assertThat(metadata.hostInfo(), equalTo(queryMetadata.getActiveHost())); + if (pickInstanceByPort) { + assertThat(queryMetadata.getStandbyHosts().size(), equalTo(0)); + } else { + assertThat("Should have standbys to query from", queryMetadata.getStandbyHosts().size() > 0); + } - final int index = metadata.hostInfo().port(); - final KafkaStreams streamsWithKey = streamsList.get(index); + final int index = queryMetadata.getActiveHost().port(); + final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; final ReadOnlyWindowStore store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); if (store == null) { @@ -339,7 +374,6 @@ private void verifyAllWindowedKeys(final List streamsList, if (store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) == null) { nullValueKeys.add(key); - continue; } } catch (final InvalidStateStoreException e) { // there must have been at least one rebalance state @@ -398,7 +432,7 @@ private void assertNoKVKeyFailures(final String storeName, } @Test - public void queryOnRebalance() throws Exception { + public void shouldBeAbleToQueryDuringRebalance() throws Exception { final int numThreads = STREAM_TWO_PARTITIONS; final List streamsList = new ArrayList<>(numThreads); final List listeners = new ArrayList<>(numThreads); @@ -422,6 +456,9 @@ public void queryOnRebalance() throws Exception { } startApplicationAndWaitUntilRunning(streamsList, Duration.ofSeconds(60)); + final Set stores = mkSet(storeName + "-" + streamThree, windowStoreName + "-" + streamThree); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 4)); + try { waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); @@ -432,7 +469,8 @@ public void queryOnRebalance() throws Exception { listeners.get(i), inputValuesKeys, storeName + "-" + streamThree, - DEFAULT_TIMEOUT_MS); + DEFAULT_TIMEOUT_MS, + true); verifyAllWindowedKeys( streamsList, streamsList.get(i), @@ -441,8 +479,10 @@ public void queryOnRebalance() throws Exception { windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE, - DEFAULT_TIMEOUT_MS); + DEFAULT_TIMEOUT_MS, + true); } + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 4)); // kill N-1 threads for (int i = 1; i < streamsList.size(); i++) { @@ -452,10 +492,12 @@ public void queryOnRebalance() throws Exception { } waitForApplicationState(streamsList.subList(1, numThreads), State.NOT_RUNNING, Duration.ofSeconds(60)); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 0)); // It's not enough to assert that the first instance is RUNNING because it is possible // for the above checks to succeed while the instance is in a REBALANCING state. waitForApplicationState(streamsList.subList(0, 1), State.RUNNING, Duration.ofSeconds(60)); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(4, 0)); // Even though the closed instance(s) are now in NOT_RUNNING there is no guarantee that // the running instance is aware of this, so we must run our follow up queries with @@ -468,7 +510,101 @@ public void queryOnRebalance() throws Exception { listeners.get(0), inputValuesKeys, storeName + "-" + streamThree, - DEFAULT_TIMEOUT_MS); + DEFAULT_TIMEOUT_MS, + true); + verifyAllWindowedKeys( + streamsList, + streamsList.get(0), + listeners.get(0), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE, + DEFAULT_TIMEOUT_MS, + true); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0)); + } finally { + for (final KafkaStreams streams : streamsList) { + streams.close(); + } + } + } + + @Test + public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { + final int numThreads = STREAM_TWO_PARTITIONS; + final List streamsList = new ArrayList<>(numThreads); + final List listeners = new ArrayList<>(numThreads); + + final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); + producerRunnable.run(); + + // create stream threads + final String storeName = "word-count-store"; + final String windowStoreName = "windowed-word-count-store"; + for (int i = 0; i < numThreads; i++) { + final Properties props = (Properties) streamsConfiguration.clone(); + final File stateDir = folder.newFolder("state-" + i); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getAbsolutePath()); + final KafkaStreams streams = + createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props); + final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub(); + streams.setStateListener(listener); + listeners.add(listener); + streamsList.add(streams); + } + startApplicationAndWaitUntilRunning(streamsList, Duration.ofSeconds(60)); + final Set stores = mkSet(storeName + "-" + streamThree, windowStoreName + "-" + streamThree); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 8)); + + try { + waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); + + // Ensure each thread can serve all keys by itself; i.e standby replication works. + for (int i = 0; i < streamsList.size(); i++) { + verifyAllKVKeys( + streamsList, + streamsList.get(i), + listeners.get(i), + inputValuesKeys, + storeName + "-" + streamThree, + DEFAULT_TIMEOUT_MS, + false); + verifyAllWindowedKeys( + streamsList, + streamsList.get(i), + listeners.get(i), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE, + DEFAULT_TIMEOUT_MS, + false); + } + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 8)); + + // kill N-1 threads + for (int i = 1; i < streamsList.size(); i++) { + final Duration closeTimeout = Duration.ofSeconds(60); + assertThat(String.format("Streams instance %s did not close in %d ms", i, closeTimeout.toMillis()), + streamsList.get(i).close(closeTimeout)); + } + + waitForApplicationState(streamsList.subList(1, numThreads), State.NOT_RUNNING, Duration.ofSeconds(60)); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0)); + + // Now, confirm that all the keys are still queryable on the remaining thread, regardless of the state + verifyAllKVKeys( + streamsList, + streamsList.get(0), + listeners.get(0), + inputValuesKeys, + storeName + "-" + streamThree, + DEFAULT_TIMEOUT_MS, + false); verifyAllWindowedKeys( streamsList, streamsList.get(0), @@ -477,7 +613,9 @@ public void queryOnRebalance() throws Exception { windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE, - DEFAULT_TIMEOUT_MS); + DEFAULT_TIMEOUT_MS, + false); + verifyOffsetLagFetch(streamsList, stores, Arrays.asList(8, 0)); } finally { for (final KafkaStreams streams : streamsList) { streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index e3f0a4121e351..8e6b6acf20a7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -343,7 +343,7 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down."); + waitForCondition(() -> !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index bdc7a65c3866b..6a5c26aa6be7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -16,41 +16,39 @@ */ package org.apache.kafka.streams.processor.internals; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StreamsMetadata; import org.junit.Before; import org.junit.Test; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - public class StreamsMetadataStateTest { private StreamsMetadataState metadataState; @@ -60,7 +58,8 @@ public class StreamsMetadataStateTest { private TopicPartition topic1P0; private TopicPartition topic2P0; private TopicPartition topic3P0; - private Map> hostToPartitions; + private Map> hostToActivePartitions; + private Map> hostToStandbyPartitions; private StreamsBuilder builder; private TopicPartition topic1P1; private TopicPartition topic2P1; @@ -68,32 +67,28 @@ public class StreamsMetadataStateTest { private Cluster cluster; private final String globalTable = "global-table"; private StreamPartitioner partitioner; + private Set storeNames; @Before public void before() { builder = new StreamsBuilder(); final KStream one = builder.stream("topic-one"); - one.groupByKey().count(Materialized.>as("table-one")); + one.groupByKey().count(Materialized.as("table-one")); final KStream two = builder.stream("topic-two"); - two.groupByKey().count(Materialized.>as("table-two")); + two.groupByKey().count(Materialized.as("table-two")); builder.stream("topic-three") .groupByKey() - .count(Materialized.>as("table-three")); + .count(Materialized.as("table-three")); - one.merge(two).groupByKey().count(Materialized.>as("merged-table")); + one.merge(two).groupByKey().count(Materialized.as("merged-table")); - builder.stream("topic-four").mapValues(new ValueMapper() { - @Override - public Object apply(final Object value) { - return value; - } - }); + builder.stream("topic-four").mapValues(value -> value); builder.globalTable("global-topic", Consumed.with(null, null), - Materialized.>as(globalTable)); + Materialized.as(globalTable)); TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId"); @@ -107,10 +102,14 @@ public Object apply(final Object value) { hostOne = new HostInfo("host-one", 8080); hostTwo = new HostInfo("host-two", 9090); hostThree = new HostInfo("host-three", 7070); - hostToPartitions = new HashMap<>(); - hostToPartitions.put(hostOne, Utils.mkSet(topic1P0, topic2P1, topic4P0)); - hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1)); - hostToPartitions.put(hostThree, Collections.singleton(topic3P0)); + hostToActivePartitions = new HashMap<>(); + hostToActivePartitions.put(hostOne, mkSet(topic1P0, topic2P1, topic4P0)); + hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1)); + hostToActivePartitions.put(hostThree, Collections.singleton(topic3P0)); + hostToStandbyPartitions = new HashMap<>(); + hostToStandbyPartitions.put(hostThree, mkSet(topic1P0, topic2P1, topic4P0)); + hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1)); + hostToStandbyPartitions.put(hostTwo, Collections.singleton(topic3P0)); final List partitionInfos = Arrays.asList( new PartitionInfo("topic-one", 0, null, null, null), @@ -122,28 +121,35 @@ public Object apply(final Object value) { cluster = new Cluster(null, Collections.emptyList(), partitionInfos, Collections.emptySet(), Collections.emptySet()); metadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne); - metadataState.onChange(hostToPartitions, cluster); - partitioner = new StreamPartitioner() { - @Override - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return 1; - } - }; + metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster); + partitioner = (topic, key, value, numPartitions) -> 1; + storeNames = mkSet("table-one", "table-two", "merged-table", globalTable); } @Test - public void shouldNotThrowNPEWhenOnChangeNotCalled() { - new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne).getAllMetadataForStore("store"); + public void shouldNotThrowExceptionWhenOnChangeNotCalled() { + final Collection metadata = new StreamsMetadataState( + TopologyWrapper.getInternalTopologyBuilder(builder.build()), hostOne).getAllMetadataForStore("store"); + assertEquals(0, metadata.size()); } @Test public void shouldGetAllStreamInstances() { - final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"), - Utils.mkSet(topic1P0, topic2P1, topic4P0)); - final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"), - Utils.mkSet(topic2P0, topic1P1)); - final StreamsMetadata three = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), - Collections.singleton(topic3P0)); + final StreamsMetadata one = new StreamsMetadata(hostOne, + mkSet(globalTable, "table-one", "table-two", "merged-table"), + mkSet(topic1P0, topic2P1, topic4P0), + mkSet("table-one", "table-two", "merged-table"), + mkSet(topic2P0, topic1P1)); + final StreamsMetadata two = new StreamsMetadata(hostTwo, + mkSet(globalTable, "table-two", "table-one", "merged-table"), + mkSet(topic2P0, topic1P1), + mkSet("table-three"), + mkSet(topic3P0)); + final StreamsMetadata three = new StreamsMetadata(hostThree, + mkSet(globalTable, "table-three"), + Collections.singleton(topic3P0), + mkSet("table-one", "table-two", "merged-table"), + mkSet(topic1P0, topic2P1, topic4P0)); final Collection actual = metadataState.getAllMetadata(); assertEquals(3, actual.size()); @@ -154,35 +160,41 @@ public void shouldGetAllStreamInstances() { @Test public void shouldGetAllStreamsInstancesWithNoStores() { - builder.stream("topic-five").filter(new Predicate() { - @Override - public boolean test(final Object key, final Object value) { - return true; - } - }).to("some-other-topic"); + builder.stream("topic-five").filter((key, value) -> true).to("some-other-topic"); final TopicPartition tp5 = new TopicPartition("topic-five", 1); final HostInfo hostFour = new HostInfo("host-four", 8080); - hostToPartitions.put(hostFour, Utils.mkSet(tp5)); + hostToActivePartitions.put(hostFour, mkSet(tp5)); - metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null)))); + metadataState.onChange(hostToActivePartitions, Collections.emptyMap(), + cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.singleton(globalTable), - Collections.singleton(tp5)); + Collections.singleton(tp5), Collections.emptySet(), Collections.emptySet()); final Collection actual = metadataState.getAllMetadata(); assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected)); } @Test public void shouldGetInstancesForStoreName() { - final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"), - Utils.mkSet(topic1P0, topic2P1, topic4P0)); - final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"), - Utils.mkSet(topic2P0, topic1P1)); + final StreamsMetadata one = new StreamsMetadata(hostOne, + mkSet(globalTable, "table-one", "table-two", "merged-table"), + mkSet(topic1P0, topic2P1, topic4P0), + mkSet("table-one", "table-two", "merged-table"), + mkSet(topic2P0, topic1P1)); + final StreamsMetadata two = new StreamsMetadata(hostTwo, + mkSet(globalTable, "table-two", "table-one", "merged-table"), + mkSet(topic2P0, topic1P1), + mkSet("table-three"), + mkSet(topic3P0)); final Collection actual = metadataState.getAllMetadataForStore("table-one"); - assertEquals(2, actual.size()); + final Map actualAsMap = actual.stream() + .collect(Collectors.toMap(StreamsMetadata::hostInfo, Function.identity())); + assertEquals(3, actual.size()); assertTrue("expected " + actual + " to contain " + one, actual.contains(one)); assertTrue("expected " + actual + " to contain " + two, actual.contains(two)); + assertTrue("expected " + hostThree + " to contain as standby", + actualAsMap.get(hostThree).standbyStateStoreNames().contains("table-one")); } @Test(expected = NullPointerException.class) @@ -199,64 +211,61 @@ public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesnt @Test public void shouldGetInstanceWithKey() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); - hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); - - metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); + hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4)); - final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), - Collections.singleton(topic3P0)); + metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, + cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); - final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three", + final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0); + final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three", "the-key", Serdes.String().serializer()); - assertEquals(expected, actual); } @Test public void shouldGetInstanceWithKeyAndCustomPartitioner() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); - hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); + hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4)); - metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); + metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, + cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); - final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-three", "merged-table"), - Utils.mkSet(topic2P0, tp4)); + final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1); - final StreamsMetadata actual = metadataState.getMetadataWithKey("table-three", "the-key", partitioner); + final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three", + "the-key", + partitioner); assertEquals(expected, actual); + assertEquals(1, actual.getPartition()); } @Test public void shouldReturnNotAvailableWhenClusterIsEmpty() { - metadataState.onChange(Collections.>emptyMap(), Cluster.empty()); - final StreamsMetadata result = metadataState.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); - assertEquals(StreamsMetadata.NOT_AVAILABLE, result); + metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), Cluster.empty()); + final KeyQueryMetadata result = metadataState.getKeyQueryMetadataForKey("table-one", "a", Serdes.String().serializer()); + assertEquals(KeyQueryMetadata.NOT_AVAILABLE, result); } @Test public void shouldGetInstanceWithKeyWithMergedStreams() { final TopicPartition topic2P2 = new TopicPartition("topic-two", 2); - hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2)); - metadataState.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)))); + hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1, topic2P2)); + hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1, topic2P2)); + metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, + cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)))); - final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("global-table", "table-two", "table-one", "merged-table"), - Utils.mkSet(topic2P0, topic1P1, topic2P2)); + final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2); - final StreamsMetadata actual = metadataState.getMetadataWithKey("merged-table", "123", new StreamPartitioner() { - @Override - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return 2; - } - }); + final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table", "the-key", + (topic, key, value, numPartitions) -> 2); assertEquals(expected, actual); - } @Test public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() { - final StreamsMetadata actual = metadataState.getMetadataWithKey("not-a-store", + final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("not-a-store", "key", Serdes.String().serializer()); assertNull(actual); @@ -264,23 +273,23 @@ public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() { @Test(expected = NullPointerException.class) public void shouldThrowWhenKeyIsNull() { - metadataState.getMetadataWithKey("table-three", null, Serdes.String().serializer()); + metadataState.getKeyQueryMetadataForKey("table-three", null, Serdes.String().serializer()); } @Test(expected = NullPointerException.class) public void shouldThrowWhenSerializerIsNull() { - metadataState.getMetadataWithKey("table-three", "key", (Serializer) null); + metadataState.getKeyQueryMetadataForKey("table-three", "key", (Serializer) null); } @Test(expected = NullPointerException.class) public void shouldThrowIfStoreNameIsNull() { - metadataState.getMetadataWithKey(null, "key", Serdes.String().serializer()); + metadataState.getKeyQueryMetadataForKey(null, "key", Serdes.String().serializer()); } @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldThrowIfStreamPartitionerIsNull() { - metadataState.getMetadataWithKey(null, "key", (StreamPartitioner) null); + metadataState.getKeyQueryMetadataForKey(null, "key", (StreamPartitioner) null); } @Test @@ -293,30 +302,40 @@ public void shouldHaveGlobalStoreInAllMetadata() { } @Test - public void shouldGetMyMetadataForGlobalStoreWithKey() { - final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()); - assertEquals(hostOne, metadata.hostInfo()); + public void shouldGetLocalMetadataWithRightActiveStandbyInfo() { + assertEquals(hostOne, metadataState.getLocalMetadata().hostInfo()); + assertEquals(hostToActivePartitions.get(hostOne), metadataState.getLocalMetadata().topicPartitions()); + assertEquals(hostToStandbyPartitions.get(hostOne), metadataState.getLocalMetadata().standbyTopicPartitions()); + assertEquals(storeNames, metadataState.getLocalMetadata().stateStoreNames()); + assertEquals(storeNames.stream().filter(s -> !s.equals(globalTable)).collect(Collectors.toSet()), + metadataState.getLocalMetadata().standbyStateStoreNames()); + } + + @Test + public void shouldGetQueryMetadataForGlobalStoreWithKey() { + final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer()); + assertEquals(hostOne, metadata.getActiveHost()); + assertTrue(metadata.getStandbyHosts().isEmpty()); } @Test public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() { final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), StreamsMetadataState.UNKNOWN_HOST); - streamsMetadataState.onChange(hostToPartitions, cluster); - assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer())); + streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster); + assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer())); } @Test - public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() { - final StreamsMetadata metadata = metadataState.getMetadataWithKey(globalTable, "key", partitioner); - assertEquals(hostOne, metadata.hostInfo()); + public void shouldGetQueryMetadataForGlobalStoreWithKeyAndPartitioner() { + final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner); + assertEquals(hostOne, metadata.getActiveHost()); + assertTrue(metadata.getStandbyHosts().isEmpty()); } @Test public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() { final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()), StreamsMetadataState.UNKNOWN_HOST); - streamsMetadataState.onChange(hostToPartitions, cluster); - assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner)); + streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster); + assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner)); } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 8f313d5bece9f..ab566d196be78 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; @@ -25,6 +28,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -881,15 +885,20 @@ public void testAssignWithStandbyReplicas() { builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final List topics = asList("topic1", "topic2"); + final Set allTopicPartitions = topics.stream() + .map(topic -> asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + final Set allTasks = mkSet(task0_0, task0_1, task0_2); final Set prevTasks00 = mkSet(task0_0); final Set prevTasks01 = mkSet(task0_1); final Set prevTasks02 = mkSet(task0_2); + final Set standbyTasks00 = mkSet(task0_0); final Set standbyTasks01 = mkSet(task0_1); final Set standbyTasks02 = mkSet(task0_2); - final Set standbyTasks00 = mkSet(task0_0); final UUID uuid1 = UUID.randomUUID(); final UUID uuid2 = UUID.randomUUID(); @@ -904,11 +913,11 @@ public void testAssignWithStandbyReplicas() { subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription( topics, - getInfo(uuid1, prevTasks00, standbyTasks01).encode())); + getInfo(uuid1, prevTasks00, standbyTasks01, "any:9096").encode())); subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription( topics, - getInfo(uuid1, prevTasks01, standbyTasks02).encode())); + getInfo(uuid1, prevTasks01, standbyTasks02, "any:9096").encode())); subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription( topics, @@ -939,12 +948,32 @@ public void testAssignWithStandbyReplicas() { allStandbyTasks.addAll(info20.standbyTasks().keySet()); // all task ids are in the active tasks and also in the standby tasks - assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, allActiveTasks); assertEquals(3, allStandbyTasks.size()); assertEquals(allTasks, allStandbyTasks); + + // Check host partition assignments + final Map> partitionsByHost = info10.partitionsByHost(); + assertEquals(2, partitionsByHost.size()); + assertEquals(allTopicPartitions, partitionsByHost.values().stream() + .flatMap(Collection::stream).collect(Collectors.toSet())); + + final Map> standbyPartitionsByHost = info10.standbyPartitionByHost(); + assertEquals(2, standbyPartitionsByHost.size()); + assertEquals(allTopicPartitions, standbyPartitionsByHost.values().stream() + .flatMap(Collection::stream).collect(Collectors.toSet())); + + for (final HostInfo hostInfo : partitionsByHost.keySet()) { + assertTrue(Collections.disjoint(partitionsByHost.get(hostInfo), standbyPartitionsByHost.get(hostInfo))); + } + + // All consumers got the same host info + assertEquals(partitionsByHost, info11.partitionsByHost()); + assertEquals(partitionsByHost, info20.partitionsByHost()); + assertEquals(standbyPartitionsByHost, info11.standbyPartitionByHost()); + assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost()); } @Test @@ -952,8 +981,8 @@ public void testOnAssignment() { createMockTaskManager(); final Map> hostState = Collections.singletonMap( new HostInfo("localhost", 9090), - mkSet(t3p0, t3p3)); - taskManager.setPartitionsByHostState(hostState); + Utils.mkSet(t3p0, t3p3)); + taskManager.setHostPartitionMappings(hostState, Collections.emptyMap()); EasyMock.expectLastCall(); final Map> activeTasks = new HashMap<>(); @@ -973,7 +1002,7 @@ public void testOnAssignment() { configurePartitionAssignor(emptyMap()); final List activeTaskList = asList(task0_0, task0_3); - final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTaskList, standbyTasks, hostState, 0); + final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTaskList, standbyTasks, hostState, Collections.emptyMap(), 0); final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); partitionAssignor.onAssignment(assignment, null); @@ -1329,12 +1358,17 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { final Map assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); final ConsumerPartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); - final Set consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); - final Set consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); - final HashSet allAssignedPartitions = new HashSet<>(consumer1partitions); - allAssignedPartitions.addAll(consumer2Partitions); - assertThat(consumer1partitions, not(allPartitions)); - assertThat(consumer2Partitions, not(allPartitions)); + + final Set consumer1ActivePartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); + final Set consumer2ActivePartitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); + final Set consumer1StandbyPartitions = assignmentInfo.standbyPartitionByHost().get(new HostInfo("localhost", 8080)); + final Set consumer2StandbyPartitions = assignmentInfo.standbyPartitionByHost().get(new HostInfo("other", 9090)); + final HashSet allAssignedPartitions = new HashSet<>(consumer1ActivePartitions); + allAssignedPartitions.addAll(consumer2ActivePartitions); + assertThat(consumer1ActivePartitions, not(allPartitions)); + assertThat(consumer2ActivePartitions, not(allPartitions)); + assertThat(consumer1ActivePartitions, equalTo(consumer2StandbyPartitions)); + assertThat(consumer2ActivePartitions, equalTo(consumer1StandbyPartitions)); assertThat(allAssignedPartitions, equalTo(allPartitions)); } @@ -1517,13 +1551,28 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN final AssignmentInfo decode = AssignmentInfo.decode(assignment.get(CONSUMER_1).userData()); assertThat( decode, - equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(task0_0, task0_2), emptyMap(), emptyMap(), 0))); + equalTo(new AssignmentInfo( + LATEST_SUPPORTED_VERSION, + Arrays.asList(task0_0, task0_2), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + 0 + ))); + // The new consumer's assignment should be empty until c1 has the chance to revoke its partitions/tasks assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList())); assertThat( AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()), - equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, emptyList(), emptyMap(), emptyMap(), 0))); + equalTo(new AssignmentInfo( + LATEST_SUPPORTED_VERSION, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + 0 + ))); } @Test @@ -1573,12 +1622,12 @@ public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionPro new ArrayList<>(activeTasks), standbyTaskMap, emptyMap(), + emptyMap(), 0 ) ) ); - assertThat(assignment.get("future-consumer").partitions(), equalTo(Collections.singletonList(t1p2))); assertThat( AssignmentInfo.decode(assignment.get("future-consumer").userData()), @@ -1588,6 +1637,7 @@ public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionPro Collections.singletonList(task0_2), futureStandbyTaskMap, emptyMap(), + emptyMap(), 0) ) ); @@ -1625,13 +1675,13 @@ public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersion assertThat(assignment.get(CONSUMER_1).partitions(), equalTo(asList(t1p0, t1p2))); assertThat( AssignmentInfo.decode(assignment.get(CONSUMER_1).userData()), - equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(task0_0, task0_2), emptyMap(), emptyMap(), 0))); + equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, asList(task0_0, task0_2), emptyMap(), emptyMap(), emptyMap(), 0))); assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(Collections.singletonList(t1p1))); assertThat( AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()), - equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, Collections.singletonList(task0_1), emptyMap(), emptyMap(), 0))); + equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, Collections.singletonList(task0_1), emptyMap(), emptyMap(), emptyMap(), 0))); } @Test @@ -1676,8 +1726,7 @@ private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMi } private static ConsumerPartitionAssignor.Assignment createAssignment(final Map> firstHostState) { - final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, emptyList(), emptyMap(), firstHostState, 0); - + final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, emptyList(), emptyMap(), firstHostState, emptyMap(), 0); return new ConsumerPartitionAssignor.Assignment(emptyList(), info.encode()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index 0d20454cd6f4d..9206969e799d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -24,11 +24,13 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; import static org.junit.Assert.assertEquals; @@ -36,71 +38,94 @@ public class AssignmentInfoTest { private final List activeTasks = Arrays.asList( new TaskId(0, 0), - new TaskId(0, 0), - new TaskId(0, 1), new TaskId(1, 0)); - private final Map> standbyTasks = new HashMap>() { - { - put(new TaskId(1, 1), - Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); - put(new TaskId(2, 0), - Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); - } - }; - private final Map> globalAssignment = new HashMap>() { - { - put(new HostInfo("localhost", 80), - Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t3", 3))); - } - }; + new TaskId(0, 1), + new TaskId(1, 0), + new TaskId(1, 1)); + + private final Map> standbyTasks = mkMap( + mkEntry(new TaskId(1, 0), mkSet(new TopicPartition("t1", 0), new TopicPartition("t2", 0))), + mkEntry(new TaskId(1, 1), mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))) + ); + + private final Map> activeAssignment = mkMap( + mkEntry(new HostInfo("localhost", 8088), + mkSet(new TopicPartition("t0", 0), + new TopicPartition("t1", 0), + new TopicPartition("t2", 0))), + mkEntry(new HostInfo("localhost", 8089), + mkSet(new TopicPartition("t0", 1), + new TopicPartition("t1", 1), + new TopicPartition("t2", 1))) + ); + + private final Map> standbyAssignment = mkMap( + mkEntry(new HostInfo("localhost", 8088), + Utils.mkSet(new TopicPartition("t1", 0), + new TopicPartition("t2", 0))), + mkEntry(new HostInfo("localhost", 8089), + Utils.mkSet(new TopicPartition("t1", 1), + new TopicPartition("t2", 1))) + ); @Test public void shouldUseLatestSupportedVersionByDefault() { - final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo info = new AssignmentInfo(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 0); assertEquals(LATEST_SUPPORTED_VERSION, info.version()); } @Test(expected = IllegalArgumentException.class) public void shouldThrowForUnknownVersion1() { - new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment, 0); + new AssignmentInfo(0, activeTasks, standbyTasks, activeAssignment, Collections.emptyMap(), 0); } @Test(expected = IllegalArgumentException.class) public void shouldThrowForUnknownVersion2() { - new AssignmentInfo(LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment, 0); + new AssignmentInfo(LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, activeAssignment, Collections.emptyMap(), 0); } @Test public void shouldEncodeAndDecodeVersion1() { - final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment, 0); - final AssignmentInfo expectedInfo = new AssignmentInfo(1, UNKNOWN, activeTasks, standbyTasks, Collections.>emptyMap(), 0); + final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(1, UNKNOWN, activeTasks, standbyTasks, Collections.emptyMap(), Collections.emptyMap(), 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion2() { - final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment, 0); - final AssignmentInfo expectedInfo = new AssignmentInfo(2, UNKNOWN, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(2, UNKNOWN, activeTasks, standbyTasks, activeAssignment, Collections.emptyMap(), 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion3() { - final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment, 0); - final AssignmentInfo expectedInfo = new AssignmentInfo(3, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(3, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, + activeAssignment, Collections.emptyMap(), 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion4() { - final AssignmentInfo info = new AssignmentInfo(4, activeTasks, standbyTasks, globalAssignment, 2); - final AssignmentInfo expectedInfo = new AssignmentInfo(4, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2); + final AssignmentInfo info = new AssignmentInfo(4, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(4, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, + activeAssignment, Collections.emptyMap(), 2); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion5() { - final AssignmentInfo info = new AssignmentInfo(5, activeTasks, standbyTasks, globalAssignment, 2); - final AssignmentInfo expectedInfo = new AssignmentInfo(5, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2); + final AssignmentInfo info = new AssignmentInfo(5, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(5, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, + activeAssignment, Collections.emptyMap(), 2); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); + } + + @Test + public void shouldEncodeAndDecodeVersion6() { + final AssignmentInfo info = new AssignmentInfo(6, activeTasks, standbyTasks, activeAssignment, standbyAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(6, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, + activeAssignment, standbyAssignment, 2); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @@ -108,8 +133,10 @@ public void shouldEncodeAndDecodeVersion5() { public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() { final int usedVersion = LATEST_SUPPORTED_VERSION - 1; final int commonlySupportedVersion = LATEST_SUPPORTED_VERSION - 1; - final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 2); - final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 2); + final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, + activeAssignment, standbyAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, + activeAssignment, Collections.emptyMap(), 2); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java index fbfa78de63046..dd12cb859aa2d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java @@ -32,7 +32,7 @@ public class LegacySubscriptionInfoSerde { private static final Logger log = LoggerFactory.getLogger(LegacySubscriptionInfoSerde.class); - public static final int LATEST_SUPPORTED_VERSION = 5; + public static final int LATEST_SUPPORTED_VERSION = 6; static final int UNKNOWN = -1; private final int usedVersion; @@ -95,7 +95,7 @@ public String userEndPoint() { * @throws TaskAssignmentException if method fails to encode the data */ public ByteBuffer encode() { - if (usedVersion == 3 || usedVersion == 4 || usedVersion == 5) { + if (usedVersion == 3 || usedVersion == 4 || usedVersion == 5 || usedVersion == 6) { final byte[] endPointBytes = prepareUserEndPoint(this.userEndPoint); final ByteBuffer buf = ByteBuffer.allocate( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 227446c2dd42e..218ecbbb798b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -233,8 +233,8 @@ public void generatedVersion3And4ShouldBeDecodableByLegacyLogic() { } @Test - public void generatedVersion3To5ShouldDecodeLegacyFormat() { - for (int version = 3; version <= 5; version++) { + public void generatedVersion3To6ShouldDecodeLegacyFormat() { + for (int version = 3; version <= 6; version++) { final LegacySubscriptionInfoSerde info = new LegacySubscriptionInfoSerde( version, LATEST_SUPPORTED_VERSION, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index ce816195fb9db..19a903ce46b03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -321,7 +321,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, } private void mockThread(final boolean initialized) { - EasyMock.expect(threadMock.isRunningAndNotRebalancing()).andReturn(initialized); + EasyMock.expect(threadMock.isRunning()).andReturn(initialized); EasyMock.expect(threadMock.tasks()).andStubReturn(tasks); EasyMock.replay(threadMock); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 360f11e93e6d2..a1adb638c203f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -225,7 +225,7 @@ public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final TaskManager taskManager = taskManger(); taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); - taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setHostPartitionMappings(partitionsByHost, info.standbyPartitionByHost()); taskManager.setPartitionsToTaskId(partitionsToTaskId); taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); taskManager.updateSubscriptionsFromAssignment(partitions);