Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
81ce457
KAFKA-15276: Implement partition assignment reconciliation
kirktrue Oct 25, 2023
8560860
Reverted previous change to decouple the network thread from the appl…
kirktrue Oct 25, 2023
5d3efaa
Updates to clean up code a bit
kirktrue Oct 25, 2023
5f17c9d
Restoring AbstractCoordinator.createMeter() to protected final
kirktrue Oct 25, 2023
d08e180
Update ApplicationEventProcessor.java
kirktrue Oct 25, 2023
0bfd395
Updates to fix threading issues with tests
kirktrue Oct 26, 2023
ad19b36
Fixed minor whitespace issue
kirktrue Oct 26, 2023
ec01569
Merge remote-tracking branch 'origin/trunk' into KAFKA-15276-partitio…
kirktrue Nov 6, 2023
a210acf
Reverted code related to the core reconciliation
kirktrue Nov 7, 2023
48dfc46
Removed more unnecessary changes to align with trunk
kirktrue Nov 7, 2023
ffe3093
Reverting more unnecessary changes
kirktrue Nov 7, 2023
f7b79e3
Making sure the InternalApplicationEventHandler closed its superclass
kirktrue Nov 7, 2023
9ae3616
Minor documentation changes in EventHandler.add
kirktrue Nov 7, 2023
9e721a5
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Nov 20, 2023
a2a4d73
Updates to try to get back to passing tests
kirktrue Nov 20, 2023
926041e
More updates to get to a clean slate
kirktrue Nov 20, 2023
e857104
Clean up #1256
kirktrue Nov 20, 2023
9d372d7
Yet more clean up
kirktrue Nov 20, 2023
c044b57
More clean up
kirktrue Nov 20, 2023
e828701
More formatting reverting
kirktrue Nov 20, 2023
ac4b4e9
More clean up
kirktrue Nov 20, 2023
77e8d8f
More formatting clean up
kirktrue Nov 20, 2023
c2e7489
Updates for a first pass at end-to-end logic. Still needs tests creat…
kirktrue Nov 22, 2023
6428ba6
Changed ConsumerRebalanceListenerCallbackName to ConsumerRebalanceLis…
kirktrue Nov 28, 2023
06d4897
Updated AsyncKafkaConsumer.unsubscribe() to block
kirktrue Nov 28, 2023
d03c51a
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Nov 28, 2023
993c040
Updates to implement blocking for the consumer to join the group, if …
kirktrue Nov 29, 2023
18bc3aa
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Nov 29, 2023
e449bc2
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Nov 29, 2023
5bdedaa
Update MembershipManagerImpl to print out partitions w/o topic IDs
kirktrue Nov 30, 2023
86adb9a
Updated ConsumerNetworkThread to block for new events (should be spun…
kirktrue Nov 30, 2023
110c7eb
Downgraded logging in MembershipManagerImpl for registering futures t…
kirktrue Nov 30, 2023
3019a9c
Added processBackgroundEvents to AsyncKafkaConsumer
kirktrue Nov 30, 2023
0bb10ee
PlaintextConsumerTest - enabling testMultiConsumerSessionTimeoutOnClo…
kirktrue Nov 30, 2023
925624b
Updates to make disabled tests TODOs
kirktrue Nov 30, 2023
18adb34
Added documentation
kirktrue Nov 30, 2023
7e2cc61
Reverting unrelated changes
kirktrue Nov 30, 2023
22e92c0
More clean up
kirktrue Dec 1, 2023
6238781
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 2, 2023
0f38095
Minor clean up
kirktrue Dec 2, 2023
3babe24
More clean up to remove unnecessary diffs
kirktrue Dec 2, 2023
8ef0f2e
More clean up
kirktrue Dec 2, 2023
b2be1e5
More clean up
kirktrue Dec 2, 2023
f3b092c
Removed memberIdForLogging() as memberId() is exposed and used for lo…
kirktrue Dec 2, 2023
134bfdb
Reverted the generalization of the EventHandler as it didn't gain much
kirktrue Dec 2, 2023
edbffc4
Reverted some unnecessary diffs for BackgroundEventHandler
kirktrue Dec 2, 2023
69f8073
Removed more unnecessary changes
kirktrue Dec 2, 2023
7183d91
More clean up
kirktrue Dec 2, 2023
b740e50
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 4, 2023
9645a24
Updates based on feedback
kirktrue Dec 4, 2023
45cf7f5
Clean up
kirktrue Dec 4, 2023
730ee86
Changed a period to a semicolon in a log message
kirktrue Dec 5, 2023
7e00f6d
Minor clean up and comments
kirktrue Dec 5, 2023
8e04846
Added unit tests for ConsumerRebalanceListener callbacks since the in…
kirktrue Dec 5, 2023
a6251e3
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 5, 2023
2096fcd
Fixed issue with BackgroundEventHandlerTest that no longer compiled; …
kirktrue Dec 5, 2023
61b95df
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 6, 2023
3f70743
Minor tweak to debug logging for ConsumerRebalanceListener callback m…
kirktrue Dec 6, 2023
f1c7502
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 7, 2023
51e3bb1
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 7, 2023
e114d94
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 7, 2023
5009ba8
Moved the client telemetry stuff out of the membership manager and to…
kirktrue Dec 7, 2023
477b5d6
Minor tweaks to the event processor APIs
kirktrue Dec 8, 2023
829af6e
Updates to only pause for AsyncKafkaConsumer.processBackgroundEvents(…
kirktrue Dec 9, 2023
c4a97b7
Updates to tests for rebalance callbacks
kirktrue Dec 9, 2023
a3dd9e4
Added more test cases to MembershipManagerImplTest for failing callbacks
kirktrue Dec 9, 2023
1c7e0ad
Updates to make creating sets of TopicPartitions and TopicIdPartition…
kirktrue Dec 9, 2023
482fa55
Added missing mock method to fix testListenerCallbacksBasic
kirktrue Dec 9, 2023
f9f3a46
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 11, 2023
251fdc9
Added MembershipManagerImplTest testListenerCallbacksThrowsErrorOnPar…
kirktrue Dec 11, 2023
64bf9de
Updates to revert unncessesary changes
kirktrue Dec 11, 2023
47bdc7c
More clean up to revert and streamline PR
kirktrue Dec 11, 2023
51ae424
More clean up
kirktrue Dec 11, 2023
f554148
Minor formatting change
kirktrue Dec 11, 2023
bc4e2aa
Added a parameterized unit test for callbacks to AsyncKafkaConsumerTest
kirktrue Dec 11, 2023
5c130eb
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 12, 2023
e57d39d
Re-enabled MembershipManagerImplTest.testListenerCallbacksThrowsError…
kirktrue Dec 12, 2023
c52b795
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 13, 2023
9211f64
Update set of PlaintextConsumerTest's disabled/enabled integration te…
kirktrue Dec 13, 2023
f2de384
Enable another previously disable test in PlaintextConsumerTest
kirktrue Dec 13, 2023
de78235
Enabled testCoordinatorFailover for consumer group protocol now that …
kirktrue Dec 14, 2023
a184aa3
TODO-ified comments for PlaintextConsumerTest for consumer interceptors
kirktrue Dec 14, 2023
04f9875
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 14, 2023
fff4440
Fix compilation error in MembershipManagerImplTest post-sync of fork
kirktrue Dec 14, 2023
91953e1
Merge branch 'trunk' into KAFKA-15276-partition-reconciliation
kirktrue Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>

<suppress checks="MethodLength"
files="ConsumerTestBuilder.java"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ void initializeResources() {
* </ol>
*/
void runOnce() {
// If there are errors processing any events, the error will be thrown immediately. This will have
// the effect of closing the background thread.
// Process the events—if any—that were produced by the application thread. It is possible that when processing
// an event generates an error. In such cases, the processor will log an exception, but we do not want those
// errors to be propagated to the caller.
applicationEventProcessor.process();

final long currentTimeMs = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

/**
* This class just provides a static name for the methods in the {@link ConsumerRebalanceListener} interface
* for a bit more compile time assurance.
*/
public enum ConsumerRebalanceListenerMethodName {

ON_PARTITIONS_REVOKED("onPartitionsRevoked"),
ON_PARTITIONS_ASSIGNED("onPartitionsAssigned"),
ON_PARTITIONS_LOST("onPartitionsLost");

private final String fullyQualifiedMethodName;

ConsumerRebalanceListenerMethodName(String methodName) {
this.fullyQualifiedMethodName = String.format("%s.%s", ConsumerRebalanceListener.class.getSimpleName(), methodName);
}

/**
* Provides the fully-qualified method name, e.g. {@code ConsumerRebalanceListener.onPartitionsRevoked}. This
* is used for log messages.
*
* @return Fully-qualified method name
*/
public String fullyQualifiedMethodName() {
return fullyQualifiedMethodName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand All @@ -51,8 +50,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public final class ConsumerUtils {
Expand Down Expand Up @@ -206,22 +205,39 @@ public static void refreshCommittedOffsets(final Map<TopicPartition, OffsetAndMe
}
}

public static <T> T getResult(CompletableFuture<T> future, Timer timer) {
public static <T> T getResult(Future<T> future, Timer timer) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable t = e.getCause();

if (t instanceof WakeupException)
throw new WakeupException();
else if (t instanceof KafkaException)
throw (KafkaException) t;
else
throw new KafkaException(t);
throw maybeWrapAsKafkaException(e.getCause());
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (java.util.concurrent.TimeoutException e) {
throw new TimeoutException(e);
}
}

public static <T> T getResult(Future<T> future) {
try {
return future.get();
} catch (ExecutionException e) {
throw maybeWrapAsKafkaException(e.getCause());
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}

public static KafkaException maybeWrapAsKafkaException(Throwable t) {
if (t instanceof KafkaException)
return (KafkaException) t;
else
return new KafkaException(t);
}

public static KafkaException maybeWrapAsKafkaException(Throwable t, String message) {
if (t instanceof KafkaException)
return (KafkaException) t;
else
return new KafkaException(message, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
Expand Down Expand Up @@ -206,6 +207,14 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
}

/**
* Returns the {@link MembershipManager} that this request manager is using to track the state of the group.
* This is provided so that the {@link ApplicationEventProcessor} can access the state for querying or updating.
*/
public MembershipManager membershipManager() {
return membershipManager;
}
Comment thread
dajac marked this conversation as resolved.

/**
* Returns the delay for which the application thread can safely wait before it should be responsive
* to results from the request managers. For example, the subscription state can change when heartbeats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;

Expand Down Expand Up @@ -145,6 +147,17 @@ public interface MembershipManager {
*/
void onSubscriptionUpdated();

/**
* Signals that a {@link ConsumerRebalanceListener} callback has completed. This is invoked when the
* application thread has completed the callback and has submitted a
* {@link ConsumerRebalanceListenerCallbackCompletedEvent} to the network I/O thread. At this point, we
* notify the state machine that it's complete so that it can move to the next appropriate step of the
* rebalance process.
*
* @param event Event with details about the callback that was executed
*/
void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event);

/**
* Transition to the {@link MemberState#JOINING} state to attempt joining a group.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.Utils.TopicIdPartitionComparator;
import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -48,6 +53,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;

/**
* Group manager for a single consumer that has a group id defined in the config
* {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset management capability,
Expand Down Expand Up @@ -245,14 +254,22 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
*/
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;

/**
* Serves as the conduit by which we can report events to the application thread. This is needed as we send
* {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if needed,
* {@link ErrorBackgroundEvent errors} to the application thread.
*/
private final BackgroundEventHandler backgroundEventHandler;

public MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
CommitRequestManager commitRequestManager,
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
Expand All @@ -266,6 +283,7 @@ public MembershipManagerImpl(String groupId,
this.currentAssignment = new HashMap<>();
this.log = logContext.logger(MembershipManagerImpl.class);
this.clientTelemetryReporter = clientTelemetryReporter;
this.backgroundEventHandler = backgroundEventHandler;
}

/**
Expand Down Expand Up @@ -1089,7 +1107,7 @@ private CompletableFuture<Void> invokeOnPartitionsRevokedCallback(Set<TopicParti
// current behaviour.
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
if (!partitionsRevoked.isEmpty() && listener.isPresent()) {
throw new UnsupportedOperationException("User-defined callbacks not supported yet");
return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_REVOKED, partitionsRevoked);
} else {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -1100,7 +1118,7 @@ private CompletableFuture<Void> invokeOnPartitionsAssignedCallback(Set<TopicPart
// the current behaviour.
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
if (listener.isPresent()) {
throw new UnsupportedOperationException("User-defined callbacks not supported yet");
return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_ASSIGNED, partitionsAssigned);
} else {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -1111,12 +1129,61 @@ private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartitio
// behaviour.
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
throw new UnsupportedOperationException("User-defined callbacks not supported yet");
return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}

/**
* Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the
* appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the
* application thread.
*
* <p/>
*
* Because the reconciliation process (run in the background thread) will be blocked by the application thread
* until it completes this, we need to provide a {@link CompletableFuture} by which to remember where we left off.
*
* @param methodName Callback method that needs to be executed on the application thread
* @param partitions Partitions to supply to the callback method
* @return Future that will be chained within the rest of the reconciliation logic
*/
private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName,
Set<TopicPartition> partitions) {
SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.addAll(partitions);
CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
backgroundEventHandler.add(event);
log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
return event.future();
}

@Override
public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event) {
ConsumerRebalanceListenerMethodName methodName = event.methodName();
Optional<KafkaException> error = event.error();
CompletableFuture<Void> future = event.future();

if (error.isPresent()) {
Exception e = error.get();
log.warn(
"The {} method completed with an error ({}); signaling to continue to the next phase of rebalance",
methodName.fullyQualifiedMethodName(),
e.getMessage()
);

future.completeExceptionally(e);
} else {
log.debug(
"The {} method completed successfully; signaling to continue to the next phase of rebalance",
methodName.fullyQualifiedMethodName()
);

future.complete(null);
}
}

/**
* Log partitions being revoked that were already paused, since the pause flag will be
* effectively lost.
Expand Down
Loading