Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bf78884
[KAFKA-6144]: Allow serving interactive queries from in-sync Standbys
vinothchandar Nov 21, 2019
81392b1
Adding KeyQueryMetadata support for reads during rebalancing
brary Nov 27, 2019
4a5f4e9
Resolving code review comments
brary Dec 3, 2019
d63ad22
Adding changes related to fetching offset lag on query
brary Dec 7, 2019
58d0f90
Reverting offset lag function to as discussed in KIP
brary Dec 7, 2019
a5f4806
Adding support to fetch checkpointed offsets and calculate lag
brary Dec 8, 2019
9bdbad0
Minor checkstyle fixes and returning the planned offsetLag map
brary Dec 8, 2019
1268e75
Minor cleanups
vinothchandar Dec 11, 2019
4a3268b
Re-implement KafkaStreams#allLocalOffsetLags() by fetching endOffsets…
vinothchandar Dec 17, 2019
9b9a7e8
Fix issue with reading standbyHostPartition maps for version <= 5
vinothchandar Dec 17, 2019
41c6dfa
Adding tests for standby streams metadata
vinothchandar Dec 24, 2019
5cebfa9
Fixing shouldApplyUpdatesToStandbyStore test case
brary Dec 25, 2019
b8e9ea3
Addressing code review comments
brary Dec 28, 2019
d776cf1
Fixing the checkstylese
brary Dec 28, 2019
fe7b70d
Changing few other maps to concurrent maps for safer iteration
vinothchandar Dec 30, 2019
e62c064
Adding tests for KafkaStreams#allLocalOffsetLags()
vinothchandar Jan 7, 2020
ed11a04
Introduce a LagInfo class that holds both raw offset position, as wel…
vinothchandar Jan 7, 2020
1bc5a64
Cleanup code bases on CR comments
vinothchandar Jan 8, 2020
e38fa9a
Bringing KafkaStreams#metadataForKey() back into QueryableStateIntegr…
vinothchandar Jan 9, 2020
1cfa5c5
Adding LagFetchIntegrationTest to test the lag values obtained during…
vinothchandar Jan 11, 2020
98879d7
Small change to enable LagInfo#equals() for test coverage
vinothchandar Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,17 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<

@Override
public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options) {
throw new UnsupportedOperationException("Not implement yet");
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
throw new UnsupportedOperationException("Not implement yet");
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
Expand Down
155 changes: 138 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
*/
package org.apache.kafka.streams;

import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -46,14 +51,17 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
Expand Down Expand Up @@ -130,6 +138,7 @@
public class KafkaStreams implements AutoCloseable {

private static final String JMX_PREFIX = "kafka.streams";
private static final long UNKNOWN_POSITION = -1;

// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
Expand Down Expand Up @@ -212,7 +221,7 @@ public enum State {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}

public boolean isRunning() {
public boolean isRunningOrRebalancing() {
return equals(RUNNING) || equals(REBALANCING);
}

Expand Down Expand Up @@ -296,17 +305,18 @@ public State state() {
return state;
}

private boolean isRunning() {
private boolean isRunningOrRebalancing() {
synchronized (stateLock) {
return state.isRunning();
return state.isRunningOrRebalancing();
}
}

private void validateIsRunning() {
if (!isRunning()) {
private void validateIsRunningOrRebalancing() {
if (!isRunningOrRebalancing()) {
throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
}
}

/**
* Listen to {@link State} change events.
*/
Expand Down Expand Up @@ -1003,7 +1013,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
* @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunning()) {
if (isRunningOrRebalancing()) {
throw new IllegalStateException("Cannot clean up while running.");
}
stateDirectory.clean();
Expand All @@ -1019,7 +1029,7 @@ public void cleanUp() {
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allMetadata() {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
}

Expand All @@ -1039,7 +1049,7 @@ public Collection<StreamsMetadata> allMetadata() {
* this application
*/
public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
}

Expand Down Expand Up @@ -1073,13 +1083,15 @@ public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
* @param key the key to find metadata for
* @param keySerializer serializer for the key
* @param <K> key type
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provided {@code storeName} and
* {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
* @deprecated Use {@link #queryMetadataForKey(String, Object, Serializer)} instead.
*/
@Deprecated
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
}

Expand All @@ -1104,16 +1116,125 @@ public <K> StreamsMetadata metadataForKey(final String storeName,
* @param key the key to find metadata for
* @param partitioner the partitioner to be use to locate the host for the key
* @param <K> key type
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provide {@code storeName} and
* @return {@link StreamsMetadata} for the {@code KafkaStreams} instance with the provided {@code storeName} and
* {@code key} of this application or {@link StreamsMetadata#NOT_AVAILABLE} if Kafka Streams is (re-)initializing
* @deprecated Use {@link #queryMetadataForKey(String, Object, StreamPartitioner)} instead.
*/
@Deprecated
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
}

/**
* Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param keySerializer serializer for the key
* @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store.
*/
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer);
}

/**
* Finds the metadata containing the active hosts and standby hosts where the key being queried would reside.
*
* @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for
* @param partitioner the partitioner to be use to locate the host for the key
* @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the
* the supplied partitioner
*/
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner);
}

/**
* Returns {@link LagInfo}, for all store partitions (active or standby) local to this Streams instance. Note that the
* values returned are just estimates and meant to be used for making soft decisions on whether the data in the store
* partition is fresh enough for querying.
*
* @return map of store names to another map of partition to {@link LagInfo}s
*/
public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
Comment thread
vinothchandar marked this conversation as resolved.
final Map<String, Map<Integer, LagInfo>> localStorePartitionLags = new HashMap<>();
final Map<TopicPartition, Long> standbyChangelogPositions = new HashMap<>();
final Map<TopicPartition, Long> activeChangelogPositions = new HashMap<>();

// Obtain the current positions, of all the active-restoring and standby tasks
for (final StreamThread streamThread : this.threads) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to do some synchronization while going through all of these threads and their metadata?

Within StreamThread and TaskManager, it doesn't seem like some of these fields are threadsafe. Might make sense to use add locking and/or return an immutable copy of some of these maps.

I'm not too familiar with the code just yet, so please tell me if I'm missing something.

Copy link
Copy Markdown
Contributor

@brary brary Dec 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information which we are fetching from StreamThreads is basically the state of each task i.e. whether an active/standby is in running/restoring/created state. And these state maps are concurrent hashmaps so I think should be fine. Let me know if I am missing any non thread safe use of these threads.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made another pass.. but I believe all the maps fetched are concurrent hash maps, which should support such access.

  • streamThread.allStandbyTasks() ultimately calls AssignedTasks#allTasks, that populates a new list from two concurrent maps
  • standbyTask.checkpointedOffsets() gets a new unmodifiable map as well, will make couple maps concurrent as well in ProcessorStateManager, for sake of avoiding future issues if some code were to call remove() on them
  • streamThread.restoringTaskIds(), streamThread.allStreamsTasks() all grab values from concurrent maps.
  • activeTask.restoredOffsets() actually does have one corner case, around suspending using ChangelogReader#remove. Will make the corresponding map concurrent as well.

if you see any other gap or a better way than fishing like this let us know :)

for (final StandbyTask standbyTask : streamThread.allStandbyTasks()) {
final Map<TopicPartition, Long> checkpointedOffsets = standbyTask.checkpointedOffsets();
standbyTask.changelogPartitions().forEach(topicPartition ->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just something to bear in mind as you make more changes: it'd be better to just stick to regular for loops instead of using the forEach method on collections. On Streams it's more natural, but for regular collections, the for loop is just fine.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont have strong opinions on this. but like to understand the reasoning behind the suggestion? is it because a lot of the code follows this style?

standbyChangelogPositions.put(topicPartition, checkpointedOffsets.getOrDefault(topicPartition, UNKNOWN_POSITION)));
}

final Set<TaskId> restoringTaskIds = streamThread.restoringTaskIds();
for (final StreamTask activeTask : streamThread.allStreamsTasks()) {
final boolean isRestoring = restoringTaskIds.contains(activeTask.id());
final Map<TopicPartition, Long> restoredOffsets = activeTask.restoredOffsets();
activeTask.changelogPartitions().forEach(topicPartition -> {
Comment thread
vinothchandar marked this conversation as resolved.
if (isRestoring) {
activeChangelogPositions.put(topicPartition, restoredOffsets.getOrDefault(topicPartition, UNKNOWN_POSITION));
} else {
activeChangelogPositions.put(topicPartition, UNKNOWN_POSITION);
}
});
}
}

log.info("Current changelog positions, for active: " + activeChangelogPositions + " standby:" + standbyChangelogPositions);
final Map<TopicPartition, OffsetSpec> offsetSpecMap = Stream.concat(
activeChangelogPositions.keySet().stream(), standbyChangelogPositions.keySet().stream())
.collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = new HashMap<>();
try {
allEndOffsets.putAll(adminClient.listOffsets(offsetSpecMap).all().get());
} catch (final Exception e) {
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
log.info("Current end offsets :" + allEndOffsets);
allEndOffsets.forEach((topicPartition, offsetsResultInfo) -> {
final String storeName = streamsMetadataState.getStoreForChangelogTopic(topicPartition.topic());
final long offsetPosition;
if (activeChangelogPositions.containsKey(topicPartition)) {
// if unknown, assume it's positioned at the tail of changelog partition
offsetPosition = activeChangelogPositions.get(topicPartition) == UNKNOWN_POSITION ?
offsetsResultInfo.offset() : activeChangelogPositions.get(topicPartition);
} else if (standbyChangelogPositions.containsKey(topicPartition)) {
// if unknown, assume it's positioned at the head of changelog partition
offsetPosition = standbyChangelogPositions.get(topicPartition) == UNKNOWN_POSITION ?
0 : standbyChangelogPositions.get(topicPartition);
} else {
throw new IllegalStateException("Topic Partition " + topicPartition + " should be either active or standby");
}

final LagInfo lagInfo = new LagInfo(offsetPosition, offsetsResultInfo.offset());
final Map<Integer, LagInfo> partitionToOffsetLag = localStorePartitionLags.getOrDefault(storeName, new HashMap<>());
if (!partitionToOffsetLag.containsKey(topicPartition.partition())) {
partitionToOffsetLag.put(topicPartition.partition(), lagInfo);
} else {
throw new IllegalStateException("Encountered the same store partition" + storeName + ","
+ topicPartition.partition() + " more than once");
}
localStorePartitionLags.put(storeName, partitionToOffsetLag);
});

return Collections.unmodifiableMap(localStorePartitionLags);
}

/**
* Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
Expand All @@ -1127,7 +1248,7 @@ public <K> StreamsMetadata metadataForKey(final String storeName,
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
}

Expand All @@ -1137,7 +1258,7 @@ public <T> T store(final String storeName, final QueryableStoreType<T> queryable
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
validateIsRunning();
validateIsRunningOrRebalancing();
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
for (final StreamThread thread : threads) {
threadMetadata.add(thread.threadMetadata());
Expand Down
104 changes: 104 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import org.apache.kafka.streams.state.HostInfo;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;

/**
* Represents all the metadata related to a key, where a particular key resides in a {@link KafkaStreams} application.
* It contains the active {@link HostInfo} and a set of standby {@link HostInfo}s, denoting the instances where the key resides.
* It also contains the partition number where the key belongs, which could be useful when used in conjunction with other APIs.
* e.g: Relating with lags for that store partition using {@link KafkaStreams#allLocalStorePartitionLags()}
* NOTE: This is a point in time view. It may change as rebalances happen.
*/
public class KeyQueryMetadata {
/**
* Sentinel to indicate that the KeyQueryMetadata is currently unavailable. This can occur during rebalance
* operations.
*/
public static final KeyQueryMetadata NOT_AVAILABLE = new KeyQueryMetadata(new HostInfo("unavailable", -1),
Collections.emptySet(),
-1);

private final HostInfo activeHost;

private final Set<HostInfo> standbyHosts;

private final int partition;

public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final int partition) {
this.activeHost = activeHost;
this.standbyHosts = standbyHosts;
this.partition = partition;
}

/**
* Get the Active streams instance for given key
*
* @return active instance's {@link HostInfo}
*/
public HostInfo getActiveHost() {
return activeHost;
}

/**
* Get the Streams instances that host the key as standbys
*
* @return set of standby {@link HostInfo} or a empty set, if no standbys are configured
*/
public Set<HostInfo> getStandbyHosts() {
Comment thread
vinothchandar marked this conversation as resolved.
return standbyHosts;
}

/**
* Get the Store partition corresponding to the key.
*
* @return store partition number
*/
public int getPartition() {
return partition;
}

@Override
public boolean equals(final Object obj) {
if (!(obj instanceof KeyQueryMetadata)) {
return false;
}
final KeyQueryMetadata keyQueryMetadata = (KeyQueryMetadata) obj;
return Objects.equals(keyQueryMetadata.activeHost, activeHost)
&& Objects.equals(keyQueryMetadata.standbyHosts, standbyHosts)
&& Objects.equals(keyQueryMetadata.partition, partition);
}

@Override
public String toString() {
return "KeyQueryMetadata {" +
"activeHost=" + activeHost +
", standbyHosts=" + standbyHosts +
", partition=" + partition +
'}';
}

@Override
public int hashCode() {
return Objects.hash(activeHost, standbyHosts, partition);
}
}
Loading