diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d84d45abded52..44fdf634ab0e9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -127,6 +127,9 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 56d65a112174c..d34316e142cc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -42,7 +42,11 @@
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
@@ -94,9 +98,10 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -149,11 +154,18 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
* {@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread
*
*/
- public class BackgroundEventProcessor extends EventProcessor {
+ private class BackgroundEventProcessor extends EventProcessor {
+
+ private final ApplicationEventHandler applicationEventHandler;
+ private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
public BackgroundEventProcessor(final LogContext logContext,
- final BlockingQueue backgroundEventQueue) {
+ final BlockingQueue backgroundEventQueue,
+ final ApplicationEventHandler applicationEventHandler,
+ final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) {
super(logContext, backgroundEventQueue);
+ this.applicationEventHandler = applicationEventHandler;
+ this.rebalanceListenerInvoker = rebalanceListenerInvoker;
}
/**
@@ -163,13 +175,25 @@ public BackgroundEventProcessor(final LogContext logContext,
* error, continue to process the remaining events, and then throw the first error that occurred.
*/
@Override
- public void process() {
- AtomicReference firstError = new AtomicReference<>();
- process((event, error) -> firstError.compareAndSet(null, error));
+ public boolean process() {
+ AtomicReference firstError = new AtomicReference<>();
+
+ ProcessHandler processHandler = (event, error) -> {
+ if (error.isPresent()) {
+ KafkaException e = error.get();
+
+ if (!firstError.compareAndSet(null, e)) {
+ log.warn("An error occurred when processing the event: {}", e.getMessage(), e);
+ }
+ }
+ };
- if (firstError.get() != null) {
+ boolean hadEvents = process(processHandler);
+
+ if (firstError.get() != null)
throw firstError.get();
- }
+
+ return hadEvents;
}
@Override
@@ -178,20 +202,21 @@ public void process(final BackgroundEvent event) {
case ERROR:
process((ErrorBackgroundEvent) event);
break;
+
case GROUP_METADATA_UPDATE:
process((GroupMetadataUpdateEvent) event);
break;
+
+ case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
+ process((ConsumerRebalanceListenerCallbackNeededEvent) event);
+ break;
+
default:
throw new IllegalArgumentException("Background event type " + event.type() + " was not expected");
}
}
- @Override
- protected Class getEventClass() {
- return BackgroundEvent.class;
- }
-
private void process(final ErrorBackgroundEvent event) {
throw event.error();
}
@@ -207,6 +232,16 @@ private void process(final GroupMetadataUpdateEvent event) {
));
}
}
+
+ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
+ ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ rebalanceListenerInvoker,
+ event.methodName(),
+ event.partitions(),
+ event.future()
+ );
+ applicationEventHandler.add(invokedEvent);
+ }
}
private final ApplicationEventHandler applicationEventHandler;
@@ -294,6 +329,10 @@ private void process(final GroupMetadataUpdateEvent event) {
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>();
+ final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(
+ logContext,
+ backgroundEventQueue
+ );
// This FetchBuffer is shared between the application and network threads.
this.fetchBuffer = new FetchBuffer(logContext);
@@ -307,7 +346,7 @@ private void process(final GroupMetadataUpdateEvent event) {
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
final Supplier requestManagersSupplier = RequestManagers.supplier(time,
logContext,
- backgroundEventQueue,
+ backgroundEventHandler,
metadata,
subscriptions,
fetchBuffer,
@@ -327,7 +366,23 @@ private void process(final GroupMetadataUpdateEvent event) {
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier);
- this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+ ConsumerCoordinatorMetrics coordinatorMetrics = new ConsumerCoordinatorMetrics(
+ subscriptions,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX
+ );
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
+ logContext,
+ subscriptions,
+ time,
+ coordinatorMetrics
+ );
+ this.backgroundEventProcessor = new BackgroundEventProcessor(
+ logContext,
+ backgroundEventQueue,
+ applicationEventHandler,
+ rebalanceListenerInvoker
+ );
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
@@ -374,6 +429,7 @@ private void process(final GroupMetadataUpdateEvent event) {
Time time,
ApplicationEventHandler applicationEventHandler,
BlockingQueue backgroundEventQueue,
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
Metrics metrics,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
@@ -389,7 +445,12 @@ private void process(final GroupMetadataUpdateEvent event) {
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
- this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+ this.backgroundEventProcessor = new BackgroundEventProcessor(
+ logContext,
+ backgroundEventQueue,
+ applicationEventHandler,
+ rebalanceListenerInvoker
+ );
this.metrics = metrics;
this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty());
this.metadata = metadata;
@@ -402,7 +463,6 @@ private void process(final GroupMetadataUpdateEvent event) {
this.clientTelemetryReporter = Optional.empty();
}
- // Visible for testing
AsyncKafkaConsumer(LogContext logContext,
Time time,
ConsumerConfig config,
@@ -443,11 +503,25 @@ private void process(final GroupMetadataUpdateEvent event) {
GroupRebalanceConfig.ProtocolType.CONSUMER
);
+ ConsumerCoordinatorMetrics coordinatorMetrics = new ConsumerCoordinatorMetrics(
+ subscriptions,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX
+ );
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);
BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>();
BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>();
- this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+ BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(
+ logContext,
+ backgroundEventQueue
+ );
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
+ logContext,
+ subscriptions,
+ time,
+ coordinatorMetrics
+ );
ApiVersions apiVersions = new ApiVersions();
Supplier networkClientDelegateSupplier = () -> new NetworkClientDelegate(
time,
@@ -458,7 +532,7 @@ private void process(final GroupMetadataUpdateEvent event) {
Supplier requestManagersSupplier = RequestManagers.supplier(
time,
logContext,
- backgroundEventQueue,
+ backgroundEventHandler,
metadata,
subscriptions,
fetchBuffer,
@@ -481,6 +555,12 @@ private void process(final GroupMetadataUpdateEvent event) {
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier);
+ this.backgroundEventProcessor = new BackgroundEventProcessor(
+ logContext,
+ backgroundEventQueue,
+ applicationEventHandler,
+ rebalanceListenerInvoker
+ );
}
private Optional initializeGroupMetadata(final ConsumerConfig config,
@@ -1232,11 +1312,14 @@ public void unsubscribe() {
if (groupMetadata.isPresent()) {
UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent();
applicationEventHandler.add(unsubscribeApplicationEvent);
+ log.info("Unsubscribing all topics or patterns and assigned partitions");
+ Timer timer = time.timer(Long.MAX_VALUE);
+
try {
- unsubscribeApplicationEvent.future().get();
+ processBackgroundEvents(backgroundEventProcessor, unsubscribeApplicationEvent.future(), timer);
log.info("Unsubscribed all topics or patterns and assigned partitions");
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed while waiting for the unsubscribe event to complete", e);
+ } catch (TimeoutException e) {
+ log.error("Failed while waiting for the unsubscribe event to complete");
}
}
subscriptions.unsubscribe();
@@ -1303,8 +1386,8 @@ private Fetch pollForFetches(Timer timer) {
*
*
*
- * This method will {@link ApplicationEventHandler#wakeupNetworkThread() wake up} the {@link ConsumerNetworkThread} before
- * retuning. This is done as an optimization so that the next round of data can be pre-fetched.
+ * This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is
+ * done as an optimization so that the next round of data can be pre-fetched.
*/
private Fetch collectFetch() {
final Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
@@ -1535,6 +1618,115 @@ private void subscribeInternal(Collection topics, Optional
+ * Process background events, if any
+ * Briefly wait for {@link CompletableApplicationEvent an event} to complete
+ *
+ *
+ *
+ *
+ * Each iteration gives the application thread an opportunity to process background events, which may be
+ * necessary to complete the overall processing.
+ *
+ *
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is
+ * periodically queried by the application thread to see if there's work to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param eventProcessor Event processor that contains the queue of events to process
+ * @param future Event that contains a {@link CompletableFuture}; it is on this future that the
+ * application thread will wait for completion
+ * @param timer Overall timer that bounds how long to wait for the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code false} otherwise
+ */
+ // Visible for testing
+ T processBackgroundEvents(EventProcessor> eventProcessor,
+ Future future,
+ Timer timer) {
+ log.trace("Will wait up to {} ms for future {} to complete", timer.remainingMs(), future);
+
+ do {
+ boolean hadEvents = eventProcessor.process();
+
+ try {
+ if (future.isDone()) {
+ // If the event is done (either successfully or otherwise), go ahead and attempt to return
+ // without waiting. We use the ConsumerUtils.getResult() method here to handle the conversion
+ // of the exception types.
+ T result = ConsumerUtils.getResult(future);
+ log.trace("Future {} completed successfully", future);
+ return result;
+ } else if (!hadEvents) {
+ // If the above processing yielded no events, then let's sit tight for a bit to allow the
+ // background thread to either a) finish the task, or b) populate the background event
+ // queue with things to process in our next loop.
+ Timer pollInterval = time.timer(100L);
+ log.trace("Waiting {} ms for future {} to complete", pollInterval.remainingMs(), future);
+ T result = ConsumerUtils.getResult(future, pollInterval);
+ log.trace("Future {} completed successfully", future);
+ return result;
+ }
+ } catch (TimeoutException e) {
+ // Ignore this as we will retry the event until the timeout expires.
+ } finally {
+ timer.update();
+ }
+ } while (timer.notExpired());
+
+ log.trace("Future {} did not complete within timeout", future);
+ throw new TimeoutException("Operation timed out before completion");
+ }
+
+ static ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks(ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
+ ConsumerRebalanceListenerMethodName methodName,
+ SortedSet partitions,
+ CompletableFuture future) {
+ final Exception e;
+
+ switch (methodName) {
+ case ON_PARTITIONS_REVOKED:
+ e = rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
+ break;
+
+ case ON_PARTITIONS_ASSIGNED:
+ e = rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
+ break;
+
+ case ON_PARTITIONS_LOST:
+ e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
+ break;
+
+ default:
+ throw new IllegalArgumentException("The method " + methodName.fullyQualifiedMethodName() + " to invoke was not expected");
+ }
+
+ final Optional error;
+
+ if (e != null)
+ error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(e, "User rebalance callback throws an error"));
+ else
+ error = Optional.empty();
+
+ return new ConsumerRebalanceListenerCallbackCompletedEvent(methodName, future, error);
+ }
+
@Override
public String clientId() {
return clientId;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 07bb9811b5b6f..cacc3e398aa9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -131,8 +131,9 @@ void initializeResources() {
*
*/
void runOnce() {
- // If there are errors processing any events, the error will be thrown immediately. This will have
- // the effect of closing the background thread.
+ // Process the events—if any—that were produced by the application thread. It is possible that when processing
+ // an event generates an error. In such cases, the processor will log an exception, but we do not want those
+ // errors to be propagated to the caller.
applicationEventProcessor.process();
final long currentTimeMs = time.milliseconds();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java
new file mode 100644
index 0000000000000..151c0e03af1d6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+/**
+ * This class just provides a static name for the methods in the {@link ConsumerRebalanceListener} interface
+ * for a bit more compile time assurance.
+ */
+public enum ConsumerRebalanceListenerMethodName {
+
+ ON_PARTITIONS_REVOKED("onPartitionsRevoked"),
+ ON_PARTITIONS_ASSIGNED("onPartitionsAssigned"),
+ ON_PARTITIONS_LOST("onPartitionsLost");
+
+ private final String fullyQualifiedMethodName;
+
+ ConsumerRebalanceListenerMethodName(String methodName) {
+ this.fullyQualifiedMethodName = String.format("%s.%s", ConsumerRebalanceListener.class.getSimpleName(), methodName);
+ }
+
+ /**
+ * Provides the fully-qualified method name, e.g. {@code ConsumerRebalanceListener.onPartitionsRevoked}. This
+ * is used for log messages.
+ *
+ * @return Fully-qualified method name
+ */
+ public String fullyQualifiedMethodName() {
+ return fullyQualifiedMethodName;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index d599d41a245ee..e84f70f7d9bed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -31,7 +31,6 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -51,8 +50,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public final class ConsumerUtils {
@@ -206,22 +205,39 @@ public static void refreshCommittedOffsets(final Map T getResult(CompletableFuture future, Timer timer) {
+ public static T getResult(Future future, Timer timer) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
- Throwable t = e.getCause();
-
- if (t instanceof WakeupException)
- throw new WakeupException();
- else if (t instanceof KafkaException)
- throw (KafkaException) t;
- else
- throw new KafkaException(t);
+ throw maybeWrapAsKafkaException(e.getCause());
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (java.util.concurrent.TimeoutException e) {
throw new TimeoutException(e);
}
}
+
+ public static T getResult(Future future) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ throw maybeWrapAsKafkaException(e.getCause());
+ } catch (InterruptedException e) {
+ throw new InterruptException(e);
+ }
+ }
+
+ public static KafkaException maybeWrapAsKafkaException(Throwable t) {
+ if (t instanceof KafkaException)
+ return (KafkaException) t;
+ else
+ return new KafkaException(t);
+ }
+
+ public static KafkaException maybeWrapAsKafkaException(Throwable t, String message) {
+ if (t instanceof KafkaException)
+ return (KafkaException) t;
+ else
+ return new KafkaException(message, t);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 0fb0346d5f416..e8883905e27af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -19,6 +19,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
@@ -206,6 +207,14 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
}
+ /**
+ * Returns the {@link MembershipManager} that this request manager is using to track the state of the group.
+ * This is provided so that the {@link ApplicationEventProcessor} can access the state for querying or updating.
+ */
+ public MembershipManager membershipManager() {
+ return membershipManager;
+ }
+
/**
* Returns the delay for which the application thread can safely wait before it should be responsive
* to results from the request managers. For example, the subscription state can change when heartbeats
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index 78127ecb206f9..aec25985093a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@@ -145,6 +147,17 @@ public interface MembershipManager {
*/
void onSubscriptionUpdated();
+ /**
+ * Signals that a {@link ConsumerRebalanceListener} callback has completed. This is invoked when the
+ * application thread has completed the callback and has submitted a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} to the network I/O thread. At this point, we
+ * notify the state machine that it's complete so that it can move to the next appropriate step of the
+ * rebalance process.
+ *
+ * @param event Event with details about the callback that was executed
+ */
+ void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event);
+
/**
* Transition to the {@link MemberState#JOINING} state to attempt joining a group.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 3c749f7dc2ef2..1cdd26977d00a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -21,6 +21,11 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.Utils.TopicIdPartitionComparator;
import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
@@ -48,6 +53,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
+
/**
* Group manager for a single consumer that has a group id defined in the config
* {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset management capability,
@@ -245,6 +254,13 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
*/
private final Optional clientTelemetryReporter;
+ /**
+ * Serves as the conduit by which we can report events to the application thread. This is needed as we send
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if needed,
+ * {@link ErrorBackgroundEvent errors} to the application thread.
+ */
+ private final BackgroundEventHandler backgroundEventHandler;
+
public MembershipManagerImpl(String groupId,
Optional groupInstanceId,
Optional serverAssignor,
@@ -252,7 +268,8 @@ public MembershipManagerImpl(String groupId,
CommitRequestManager commitRequestManager,
ConsumerMetadata metadata,
LogContext logContext,
- Optional clientTelemetryReporter) {
+ Optional clientTelemetryReporter,
+ BackgroundEventHandler backgroundEventHandler) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
@@ -266,6 +283,7 @@ public MembershipManagerImpl(String groupId,
this.currentAssignment = new HashMap<>();
this.log = logContext.logger(MembershipManagerImpl.class);
this.clientTelemetryReporter = clientTelemetryReporter;
+ this.backgroundEventHandler = backgroundEventHandler;
}
/**
@@ -1089,7 +1107,7 @@ private CompletableFuture invokeOnPartitionsRevokedCallback(Set listener = subscriptions.rebalanceListener();
if (!partitionsRevoked.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_REVOKED, partitionsRevoked);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -1100,7 +1118,7 @@ private CompletableFuture invokeOnPartitionsAssignedCallback(Set listener = subscriptions.rebalanceListener();
if (listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_ASSIGNED, partitionsAssigned);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -1111,12 +1129,61 @@ private CompletableFuture invokeOnPartitionsLostCallback(Set listener = subscriptions.rebalanceListener();
if (!partitionsLost.isEmpty() && listener.isPresent()) {
- throw new UnsupportedOperationException("User-defined callbacks not supported yet");
+ return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost);
} else {
return CompletableFuture.completedFuture(null);
}
}
+ /**
+ * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the
+ * appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the
+ * application thread.
+ *
+ *
+ *
+ * Because the reconciliation process (run in the background thread) will be blocked by the application thread
+ * until it completes this, we need to provide a {@link CompletableFuture} by which to remember where we left off.
+ *
+ * @param methodName Callback method that needs to be executed on the application thread
+ * @param partitions Partitions to supply to the callback method
+ * @return Future that will be chained within the rest of the reconciliation logic
+ */
+ private CompletableFuture enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName,
+ Set partitions) {
+ SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ sortedPartitions.addAll(partitions);
+ CompletableBackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+ backgroundEventHandler.add(event);
+ log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
+ return event.future();
+ }
+
+ @Override
+ public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event) {
+ ConsumerRebalanceListenerMethodName methodName = event.methodName();
+ Optional error = event.error();
+ CompletableFuture future = event.future();
+
+ if (error.isPresent()) {
+ Exception e = error.get();
+ log.warn(
+ "The {} method completed with an error ({}); signaling to continue to the next phase of rebalance",
+ methodName.fullyQualifiedMethodName(),
+ e.getMessage()
+ );
+
+ future.completeExceptionally(e);
+ } else {
+ log.debug(
+ "The {} method completed successfully; signaling to continue to the next phase of rebalance",
+ methodName.fullyQualifiedMethodName()
+ );
+
+ future.complete(null);
+ }
+ }
+
/**
* Log partitions being revoked that were already paused, since the pause flag will be
* effectively lost.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 3dce32708734d..935402b884d81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -19,7 +19,6 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
@@ -32,7 +31,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
@@ -50,7 +48,6 @@ public class RequestManagers implements Closeable {
public final Optional coordinatorRequestManager;
public final Optional commitRequestManager;
public final Optional heartbeatRequestManager;
- public final Optional membershipManager;
public final OffsetsRequestManager offsetsRequestManager;
public final TopicMetadataRequestManager topicMetadataRequestManager;
public final FetchRequestManager fetchRequestManager;
@@ -63,8 +60,7 @@ public RequestManagers(LogContext logContext,
FetchRequestManager fetchRequestManager,
Optional coordinatorRequestManager,
Optional commitRequestManager,
- Optional heartbeatRequestManager,
- Optional membershipManager) {
+ Optional heartbeatRequestManager) {
this.log = logContext.logger(RequestManagers.class);
this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null");
this.coordinatorRequestManager = coordinatorRequestManager;
@@ -72,7 +68,6 @@ public RequestManagers(LogContext logContext,
this.topicMetadataRequestManager = topicMetadataRequestManager;
this.fetchRequestManager = fetchRequestManager;
this.heartbeatRequestManager = heartbeatRequestManager;
- this.membershipManager = membershipManager;
List> list = new ArrayList<>();
list.add(coordinatorRequestManager);
@@ -112,7 +107,7 @@ public void close() {
*/
public static Supplier supplier(final Time time,
final LogContext logContext,
- final BlockingQueue backgroundEventQueue,
+ final BackgroundEventHandler backgroundEventHandler,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchBuffer fetchBuffer,
@@ -126,7 +121,6 @@ public static Supplier supplier(final Time time,
@Override
protected RequestManagers create() {
final NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get();
- final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(logContext, backgroundEventQueue);
final FetchConfig fetchConfig = new FetchConfig(config);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -182,7 +176,8 @@ protected RequestManagers create() {
commit,
metadata,
logContext,
- clientTelemetryReporter);
+ clientTelemetryReporter,
+ backgroundEventHandler);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
time,
@@ -200,8 +195,7 @@ protected RequestManagers create() {
fetch,
Optional.ofNullable(coordinator),
Optional.ofNullable(commit),
- Optional.ofNullable(heartbeatRequestManager),
- Optional.ofNullable(membershipManager)
+ Optional.ofNullable(heartbeatRequestManager)
);
}
};
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e6ff3b2fe5e81..bc3c4f8d4a4a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -26,7 +26,7 @@ public abstract class ApplicationEvent {
public enum Type {
COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE,
- UNSUBSCRIBE
+ UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED
}
private final Type type;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index f564a80d47f7f..7535edf5970b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -62,15 +62,15 @@ public ApplicationEventHandler(final LogContext logContext,
}
/**
- * Add an {@link ApplicationEvent} to the handler and then internally invoke {@link #wakeupNetworkThread}
+ * Add an {@link ApplicationEvent} to the handler and then internally invoke {@link #wakeupNetworkThread()}
* to alert the network I/O thread that it has something to process.
*
* @param event An {@link ApplicationEvent} created by the application thread
*/
public void add(final ApplicationEvent event) {
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");
- log.trace("Enqueued event: {}", event);
applicationEventQueue.add(event);
+ log.trace("Enqueued event: {}", event);
wakeupNetworkThread();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 35f5370893015..d7aade6e31a10 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -61,9 +61,8 @@ public ApplicationEventProcessor(final LogContext logContext,
* an event generates an error. In such cases, the processor will log an exception, but we do not want those
* errors to be propagated to the caller.
*/
- @Override
- public void process() {
- process((event, error) -> { });
+ public boolean process() {
+ return process((event, error) -> error.ifPresent(e -> log.warn("Error processing event {}", e.getMessage(), e)));
}
@Override
@@ -113,16 +112,15 @@ public void process(ApplicationEvent event) {
process((UnsubscribeApplicationEvent) event);
return;
+ case CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED:
+ process((ConsumerRebalanceListenerCallbackCompletedEvent) event);
+ return;
+
default:
log.warn("Application event type " + event.type() + " was not expected");
}
}
- @Override
- protected Class getEventClass() {
- return ApplicationEvent.class;
- }
-
private void process(final PollApplicationEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;
@@ -180,12 +178,12 @@ private void process(final ListOffsetsApplicationEvent event) {
* consumer join the group if it is not part of it yet, or send the updated subscription if
* it is already a member.
*/
- private void process(final SubscriptionChangeApplicationEvent event) {
- if (!requestManagers.membershipManager.isPresent()) {
- throw new RuntimeException("Group membership manager not present when processing a " +
- "subscribe event");
+ private void process(final SubscriptionChangeApplicationEvent ignored) {
+ if (!requestManagers.heartbeatRequestManager.isPresent()) {
+ log.warn("Group membership manager not present when processing a subscribe event");
+ return;
}
- MembershipManager membershipManager = requestManagers.membershipManager.get();
+ MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager();
membershipManager.onSubscriptionUpdated();
}
@@ -198,11 +196,12 @@ private void process(final SubscriptionChangeApplicationEvent event) {
* the group is sent out.
*/
private void process(final UnsubscribeApplicationEvent event) {
- if (!requestManagers.membershipManager.isPresent()) {
- throw new RuntimeException("Group membership manager not present when processing an " +
- "unsubscribe event");
+ if (!requestManagers.heartbeatRequestManager.isPresent()) {
+ KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event");
+ event.future().completeExceptionally(error);
+ return;
}
- MembershipManager membershipManager = requestManagers.membershipManager.get();
+ MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager();
CompletableFuture result = membershipManager.leaveGroup();
event.chain(result);
}
@@ -231,6 +230,18 @@ private void process(final TopicMetadataApplicationEvent event) {
event.chain(future);
}
+ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event) {
+ if (!requestManagers.heartbeatRequestManager.isPresent()) {
+ log.warn(
+ "An internal error occurred; the group membership manager was not present, so the notification of the {} callback execution could not be sent",
+ event.methodName()
+ );
+ return;
+ }
+ MembershipManager manager = requestManagers.heartbeatRequestManager.get().membershipManager();
+ manager.consumerRebalanceListenerCallbackCompleted(event);
+ }
+
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 0e44fe032fe60..e5d522201ef0a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -26,11 +26,10 @@
public abstract class BackgroundEvent {
public enum Type {
- ERROR,
- GROUP_METADATA_UPDATE
+ ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, GROUP_METADATA_UPDATE
}
- protected final Type type;
+ private final Type type;
public BackgroundEvent(Type type) {
this.type = Objects.requireNonNull(type);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index cafa426d033aa..103493d25314f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -46,7 +46,7 @@ public BackgroundEventHandler(final LogContext logContext, final Queue
+ */
+public abstract class CompletableBackgroundEvent extends BackgroundEvent implements CompletableEvent {
+
+ private final CompletableFuture future;
+
+ protected CompletableBackgroundEvent(Type type) {
+ super(type);
+ this.future = new CompletableFuture<>();
+ }
+
+ public CompletableFuture future() {
+ return future;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ CompletableBackgroundEvent> that = (CompletableBackgroundEvent>) o;
+
+ return future.equals(that.future);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + future.hashCode();
+ return result;
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() + ", future=" + future;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ toStringBase() +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
new file mode 100644
index 0000000000000..b260c6154ea5f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
+import org.apache.kafka.common.KafkaException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Event that signifies that the application thread has executed the {@link ConsumerRebalanceListener} callback. If
+ * the callback execution threw an error, it is included in the event should any event listener want to know.
+ */
+public class ConsumerRebalanceListenerCallbackCompletedEvent extends ApplicationEvent {
+
+ private final ConsumerRebalanceListenerMethodName methodName;
+ private final CompletableFuture future;
+ private final Optional error;
+
+ public ConsumerRebalanceListenerCallbackCompletedEvent(ConsumerRebalanceListenerMethodName methodName,
+ CompletableFuture future,
+ Optional error) {
+ super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED);
+ this.methodName = Objects.requireNonNull(methodName);
+ this.future = Objects.requireNonNull(future);
+ this.error = Objects.requireNonNull(error);
+ }
+
+ public ConsumerRebalanceListenerMethodName methodName() {
+ return methodName;
+ }
+
+ public CompletableFuture future() {
+ return future;
+ }
+
+ public Optional error() {
+ return error;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ ConsumerRebalanceListenerCallbackCompletedEvent that = (ConsumerRebalanceListenerCallbackCompletedEvent) o;
+
+ return methodName == that.methodName &&
+ future.equals(that.future) &&
+ error.equals(that.error);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(methodName, future, error);
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() +
+ ", methodName=" + methodName +
+ ", future=" + future +
+ ", error=" + error;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ toStringBase() +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
new file mode 100644
index 0000000000000..7b17c034abdbd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Event that signifies that the network I/O thread wants to invoke one of the callback methods on the
+ * {@link ConsumerRebalanceListener}. This event will be processed by the application thread when the next
+ * {@link Consumer#poll(Duration)} call is performed by the user. When processed, the application thread should
+ * invoke the appropriate callback method (based on {@link #methodName()}) with the given partitions.
+ */
+public class ConsumerRebalanceListenerCallbackNeededEvent extends CompletableBackgroundEvent {
+
+ private final ConsumerRebalanceListenerMethodName methodName;
+ private final SortedSet partitions;
+
+ public ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName methodName,
+ SortedSet partitions) {
+ super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED);
+ this.methodName = Objects.requireNonNull(methodName);
+ this.partitions = Collections.unmodifiableSortedSet(partitions);
+ }
+
+ public ConsumerRebalanceListenerMethodName methodName() {
+ return methodName;
+ }
+
+ public SortedSet partitions() {
+ return partitions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ ConsumerRebalanceListenerCallbackNeededEvent that = (ConsumerRebalanceListenerCallbackNeededEvent) o;
+
+ return methodName == that.methodName && partitions.equals(that.partitions);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + methodName.hashCode();
+ result = 31 * result + partitions.hashCode();
+ return result;
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() +
+ ", methodName=" + methodName +
+ ", partitions=" + partitions;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ toStringBase() +
+ '}';
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java
index 06e7ec28dd7e3..79a987e8a7aa9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
@@ -25,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
/**
@@ -46,56 +48,53 @@ protected EventProcessor(final LogContext logContext, final BlockingQueue eve
this.closer = new IdempotentCloser();
}
- public abstract void process();
+ public abstract boolean process();
- public abstract void process(T event);
+ protected abstract void process(T event);
@Override
public void close() {
closer.close(this::closeInternal, () -> log.warn("The event processor was already closed"));
}
- protected abstract Class getEventClass();
+ protected interface ProcessHandler {
- protected interface ProcessErrorHandler {
-
- void onError(T event, KafkaException error);
+ void onProcess(T event, Optional error);
}
/**
* Drains all available events from the queue, and then processes them in order. If any errors are thrown while
- * processing the individual events, these are submitted to the given {@link ProcessErrorHandler}.
+ * processing the individual events, these are submitted to the given {@link ProcessHandler}.
*/
- protected void process(ProcessErrorHandler processErrorHandler) {
- String eventClassName = getEventClass().getSimpleName();
- closer.assertOpen(() -> String.format("The processor was previously closed, so no further %s processing can occur", eventClassName));
+ protected boolean process(ProcessHandler processHandler) {
+ closer.assertOpen("The processor was previously closed, so no further processing can occur");
List events = drain();
+ if (events.isEmpty()) {
+ log.trace("No events to process");
+ return false;
+ }
+
try {
- log.debug("Starting processing of {} {}(s)", events.size(), eventClassName);
+ log.trace("Starting processing of {} event{}", events.size(), events.size() == 1 ? "" : "s");
for (T event : events) {
try {
- Objects.requireNonNull(event, () -> String.format("Attempted to process a null %s", eventClassName));
- log.debug("Consuming {}: {}", eventClassName, event);
+ Objects.requireNonNull(event, "Attempted to process a null event");
+ log.trace("Processing event: {}", event);
process(event);
+ processHandler.onProcess(event, Optional.empty());
} catch (Throwable t) {
- log.warn("An error occurred when processing the {}: {}", eventClassName, t.getMessage(), t);
-
- KafkaException error;
-
- if (t instanceof KafkaException)
- error = (KafkaException) t;
- else
- error = new KafkaException(t);
-
- processErrorHandler.onError(event, error);
+ KafkaException error = ConsumerUtils.maybeWrapAsKafkaException(t);
+ processHandler.onProcess(event, Optional.of(error));
}
}
} finally {
- log.debug("Completed processing of {} {}(s)", events.size(), eventClassName);
+ log.trace("Completed processing");
}
+
+ return true;
}
/**
@@ -103,8 +102,7 @@ protected void process(ProcessErrorHandler processErrorHandler) {
* this case, we need to throw an exception to notify the user the consumer is closed.
*/
private void closeInternal() {
- String eventClassName = getEventClass().getSimpleName();
- log.trace("Closing event processor for {}", eventClassName);
+ log.trace("Closing event processor");
List incompleteEvents = drain();
if (incompleteEvents.isEmpty())
@@ -123,7 +121,7 @@ private void closeInternal() {
f.completeExceptionally(exception);
});
- log.debug("Discarding {} {}s because the consumer is closing", incompleteEvents.size(), eventClassName);
+ log.debug("Discarding {} events because the consumer is closing", incompleteEvents.size());
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 6fd1050066984..cad3bc58a41cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -19,6 +19,7 @@
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
@@ -31,7 +32,10 @@
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
@@ -58,6 +62,8 @@
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -66,6 +72,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
@@ -86,18 +93,24 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
+import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -127,6 +140,7 @@ public class AsyncKafkaConsumerTest {
private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
private ApplicationEventHandler applicationEventHandler;
private SubscriptionState subscriptions;
+ private BlockingQueue backgroundEventQueue;
@BeforeEach
public void setup() {
@@ -140,6 +154,7 @@ private void setup(Optional groupInfo, boo
consumer = testBuilder.consumer;
fetchCollector = testBuilder.fetchCollector;
subscriptions = testBuilder.subscriptions;
+ backgroundEventQueue = testBuilder.backgroundEventQueue;
}
@AfterEach
@@ -856,6 +871,78 @@ public void testGroupMetadataUpdateSingleCall() {
}
}
+ /**
+ * Tests that the consumer correctly invokes the callbacks for {@link ConsumerRebalanceListener} that was
+ * specified. We don't go through the full effort to emulate heartbeats and correct group management here. We're
+ * simply exercising the background {@link EventProcessor} does the correct thing when
+ * {@link AsyncKafkaConsumer#poll(Duration)} is called.
+ *
+ * Note that we test {@link ConsumerRebalanceListener} that throws errors in its different callbacks. Failed
+ * callback execution does not immediately errors. Instead, those errors are forwarded to the
+ * application event thread for the {@link MembershipManagerImpl} to handle.
+ */
+ @ParameterizedTest
+ @MethodSource("listenerCallbacksInvokeSource")
+ public void testListenerCallbacksInvoke(List methodNames,
+ Optional revokedError,
+ Optional assignedError,
+ Optional lostError,
+ int expectedRevokedCount,
+ int expectedAssignedCount,
+ int expectedLostCount) {
+ CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(
+ revokedError,
+ assignedError,
+ lostError
+ );
+ consumer.subscribe(Collections.singletonList("topic"), consumerRebalanceListener);
+ SortedSet partitions = Collections.emptySortedSet();
+
+ for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
+ CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
+ backgroundEventQueue.add(e);
+
+ // This will trigger the background event queue to process our background event message.
+ consumer.poll(Duration.ZERO);
+ }
+
+ assertEquals(expectedRevokedCount, consumerRebalanceListener.revokedCount());
+ assertEquals(expectedAssignedCount, consumerRebalanceListener.assignedCount());
+ assertEquals(expectedLostCount, consumerRebalanceListener.lostCount());
+ }
+
+ private static Stream listenerCallbacksInvokeSource() {
+ Optional empty = Optional.empty();
+ Optional error = Optional.of(new RuntimeException("Intentional error"));
+
+ return Stream.of(
+ // Tests if we don't have an event, the listener doesn't get called.
+ Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 0),
+
+ // Tests if we get an event for a revocation, that we invoke our listener.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0),
+
+ // Tests if we get an event for an assignment, that we invoke our listener.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0),
+
+ // Tests that we invoke our listener even if it encounters an exception.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1),
+
+ // Tests that we invoke our listener even if it encounters an exception.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0),
+
+ // Tests that we invoke our listener even if it encounters an exception.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0),
+
+ // Tests that we invoke our listener even if it encounters an exception.
+ Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1),
+
+ // Tests if we get separate events for revocation and then assignment--AND our revocation throws an error--
+ // we still invoke the listeners correctly without throwing the error at the user.
+ Arguments.of(Arrays.asList(ON_PARTITIONS_REVOKED, ON_PARTITIONS_ASSIGNED), error, empty, empty, 1, 1, 0)
+ );
+ }
+
@Test
public void testBackgroundError() {
final String groupId = "consumerGroupA";
@@ -1084,6 +1171,86 @@ public void testLongPollWaitIsLimited() {
assertEquals(singleton(tp), consumer.assignment());
}
+ /**
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
+ * handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
+ */
+ @Test
+ public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
+ Time time = new MockTime();
+ Timer timer = time.timer(1000);
+ CompletableFuture> future = mock(CompletableFuture.class);
+ CountDownLatch latch = new CountDownLatch(3);
+
+ // Mock our call to Future.get(timeout) so that it mimics a delay of 200 milliseconds. Keep in mind that
+ // the incremental timeout inside processBackgroundEvents is 100 seconds for each pass. Our first two passes
+ // will exceed the incremental timeout, but the third will return.
+ doAnswer(invocation -> {
+ latch.countDown();
+
+ if (latch.getCount() > 0) {
+ long timeout = invocation.getArgument(0, Long.class);
+ timer.sleep(timeout);
+ throw new java.util.concurrent.TimeoutException("Intentional timeout");
+ }
+
+ future.complete(null);
+ return null;
+ }).when(future).get(any(Long.class), any(TimeUnit.class));
+
+ try (EventProcessor> processor = mock(EventProcessor.class)) {
+ consumer.processBackgroundEvents(processor, future, timer);
+
+ // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
+ assertEquals(800, timer.remainingMs());
+ }
+ }
+
+ /**
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
+ * handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
+ */
+ @Test
+ public void testProcessBackgroundEventsWithoutDelay() {
+ Time time = new MockTime();
+ Timer timer = time.timer(1000);
+
+ // Create a future that is already completed.
+ CompletableFuture> future = CompletableFuture.completedFuture(null);
+
+ try (EventProcessor> processor = mock(EventProcessor.class)) {
+ consumer.processBackgroundEvents(processor, future, timer);
+
+ // Because we didn't need to perform a timed get, we should still have every last millisecond
+ // of our initial timeout.
+ assertEquals(1000, timer.remainingMs());
+ }
+ }
+
+ /**
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(EventProcessor, Future, Timer) processBackgroundEvents}
+ * handles the case where the {@link Future} does not complete within the timeout.
+ */
+ @Test
+ public void testProcessBackgroundEventsTimesOut() throws Exception {
+ Time time = new MockTime();
+ Timer timer = time.timer(1000);
+ CompletableFuture> future = mock(CompletableFuture.class);
+
+ doAnswer(invocation -> {
+ long timeout = invocation.getArgument(0, Long.class);
+ timer.sleep(timeout);
+ throw new java.util.concurrent.TimeoutException("Intentional timeout");
+ }).when(future).get(any(Long.class), any(TimeUnit.class));
+
+ try (EventProcessor> processor = mock(EventProcessor.class)) {
+ assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(processor, future, timer));
+
+ // Because we forced our mocked future to continuously time out, we should have no time remaining.
+ assertEquals(0, timer.remainingMs());
+ }
+ }
+
private void assertNoPendingWakeup(final WakeupTrigger wakeupTrigger) {
assertNull(wakeupTrigger.getPendingTask());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index ba0410c40b60c..716100613b5dc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -51,6 +51,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
@@ -96,6 +97,7 @@ public class ConsumerTestBuilder implements Closeable {
final RequestManagers requestManagers;
public final ApplicationEventProcessor applicationEventProcessor;
public final BackgroundEventHandler backgroundEventHandler;
+ public final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
final MockClient client;
final Optional groupInfo;
@@ -206,7 +208,8 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA
commit,
metadata,
logContext,
- Optional.empty()
+ Optional.empty(),
+ backgroundEventHandler
)
);
HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
@@ -261,14 +264,24 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA
fetchRequestManager,
coordinatorRequestManager,
commitRequestManager,
- heartbeatRequestManager,
- membershipManager);
+ heartbeatRequestManager);
this.applicationEventProcessor = spy(new ApplicationEventProcessor(
logContext,
applicationEventQueue,
requestManagers,
metadata)
);
+ ConsumerCoordinatorMetrics consumerCoordinatorMetrics = new ConsumerCoordinatorMetrics(
+ subscriptions,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX
+ );
+ this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
+ logContext,
+ subscriptions,
+ time,
+ consumerCoordinatorMetrics
+ );
}
@Override
@@ -354,6 +367,7 @@ public AsyncKafkaConsumerTestBuilder(Optional groupInfo, boole
time,
applicationEventHandler,
backgroundEventQueue,
+ rebalanceListenerInvoker,
metrics,
subscriptions,
metadata,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CounterConsumerRebalanceListener.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CounterConsumerRebalanceListener.java
new file mode 100644
index 0000000000000..645920faf4611
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CounterConsumerRebalanceListener.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CounterConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+ private final AtomicInteger revokedCounter = new AtomicInteger();
+ private final AtomicInteger assignedCounter = new AtomicInteger();
+ private final AtomicInteger lostCounter = new AtomicInteger();
+
+ private final Optional revokedError;
+ private final Optional assignedError;
+ private final Optional lostError;
+
+ public CounterConsumerRebalanceListener() {
+ this(Optional.empty(), Optional.empty(), Optional.empty());
+ }
+
+ public CounterConsumerRebalanceListener(Optional revokedError,
+ Optional assignedError,
+ Optional lostError) {
+ this.revokedError = revokedError;
+ this.assignedError = assignedError;
+ this.lostError = lostError;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ try {
+ if (revokedError.isPresent())
+ throw revokedError.get();
+ } finally {
+ revokedCounter.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ try {
+ if (assignedError.isPresent())
+ throw assignedError.get();
+ } finally {
+ assignedCounter.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onPartitionsLost(Collection partitions) {
+ try {
+ if (lostError.isPresent())
+ throw lostError.get();
+ } finally {
+ lostCounter.incrementAndGet();
+ }
+ }
+
+ public int revokedCount() {
+ return revokedCounter.get();
+ }
+
+ public int assignedCount() {
+ return assignedCounter.get();
+ }
+
+ public int lostCount() {
+ return lostCounter.get();
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index c09409cbc0efe..88058d1d54132 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -17,18 +17,26 @@
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -39,13 +47,17 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -66,19 +78,24 @@ public class MembershipManagerImplTest {
private static final String MEMBER_ID = "test-member-1";
private static final int MEMBER_EPOCH = 1;
+ private final LogContext logContext = new LogContext();
private SubscriptionState subscriptionState;
private ConsumerMetadata metadata;
private CommitRequestManager commitRequestManager;
private ConsumerTestBuilder testBuilder;
+ private BlockingQueue backgroundEventQueue;
+ private BackgroundEventHandler backgroundEventHandler;
@BeforeEach
public void setup() {
testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
metadata = testBuilder.metadata;
subscriptionState = testBuilder.subscriptions;
- commitRequestManager = testBuilder.commitRequestManager.get();
+ commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
+ backgroundEventQueue = testBuilder.backgroundEventQueue;
+ backgroundEventHandler = testBuilder.backgroundEventHandler;
}
@AfterEach
@@ -91,7 +108,7 @@ public void tearDown() {
private MembershipManagerImpl createMembershipManagerJoiningGroup() {
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager,
- metadata, testBuilder.logContext, Optional.empty()));
+ metadata, logContext, Optional.empty(), backgroundEventHandler));
manager.transitionToJoining();
return manager;
}
@@ -99,8 +116,8 @@ private MembershipManagerImpl createMembershipManagerJoiningGroup() {
private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId) {
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(),
- subscriptionState, commitRequestManager, metadata, testBuilder.logContext,
- Optional.empty()));
+ subscriptionState, commitRequestManager, metadata, logContext,
+ Optional.empty(), backgroundEventHandler));
manager.transitionToJoining();
return manager;
}
@@ -109,7 +126,8 @@ private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupIn
String serverAssignor) {
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor),
- subscriptionState, commitRequestManager, metadata, testBuilder.logContext, Optional.empty());
+ subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
+ backgroundEventHandler);
manager.transitionToJoining();
return manager;
}
@@ -134,7 +152,7 @@ public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin()
// First join should register to get metadata updates
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager,
- metadata, testBuilder.logContext, Optional.empty());
+ metadata, logContext, Optional.empty(), backgroundEventHandler);
manager.transitionToJoining();
verify(metadata).addClusterUpdateListener(manager);
clearInvocations(metadata);
@@ -212,7 +230,7 @@ public void testTransitionToFatal() {
public void testTransitionToFailedWhenTryingToJoin() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), Optional.empty(), subscriptionState, commitRequestManager, metadata,
- testBuilder.logContext, Optional.empty());
+ logContext, Optional.empty(), backgroundEventHandler);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
@@ -283,7 +301,7 @@ public void testNewAssignmentIgnoredWhenStateIsPrepareLeaving() {
// the group, ignoring the new assignment received.
Uuid topicId = Uuid.randomUuid();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, "topic1",
- Collections.emptyList(), true);
+ Collections.emptyList());
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
assertTrue(membershipManager.assignmentReadyToReconcile().isEmpty());
@@ -354,7 +372,7 @@ public void testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStat
String topic1 = "topic1";
List owned = Collections.singletonList(
new TopicIdPartition(topicId1, new TopicPartition(topic1, 0)));
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId1, topic1, owned, true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId1, topic1, owned);
// Reconciliation that does not complete stuck on revocation commit.
CompletableFuture commitResult = mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager);
@@ -387,15 +405,13 @@ public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
String topic1 = "topic1";
List owned = Collections.singletonList(new TopicIdPartition(topicId1,
new TopicPartition(topic1, 0)));
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId1, topic1, owned, true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId1, topic1, owned);
// Reconciliation that does not complete stuck on revocation commit.
CompletableFuture commitResult =
mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId1, topic1,
Arrays.asList(1, 2), true);
- Set assignment1 = new HashSet<>();
- assignment1.add(new TopicIdPartition(topicId1, new TopicPartition(topic1, 1)));
- assignment1.add(new TopicIdPartition(topicId1, new TopicPartition(topic1, 2)));
+ Set assignment1 = topicIdPartitionsSet(topicId1, topic1, 1, 2);
assertEquals(assignment1, membershipManager.assignmentReadyToReconcile());
// Get fenced and rejoin while still reconciling. Get new assignment to reconcile after
@@ -407,11 +423,10 @@ public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
// yet because there is another on in progress, but should keep the new assignment ready
// to be reconciled next.
Uuid topicId3 = Uuid.randomUuid();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId3, "topic3", owned, true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId3, "topic3", owned);
receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
- Set assignmentAfterRejoin = Collections.singleton(
- new TopicIdPartition(topicId3, new TopicPartition("topic3", 5)));
+ Set assignmentAfterRejoin = topicIdPartitionsSet(topicId3, "topic3", 5);
assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile());
// Reconciliation completes when the member has already re-joined the group. Should not
@@ -439,7 +454,7 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
Uuid topicId1 = Uuid.randomUuid();
String topic1 = "topic1";
MembershipManagerImpl membershipManager =
- mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Arrays.asList(0));
+ mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Collections.singletonList(0));
membershipManager.onHeartbeatRequestSent();
assertEquals(MemberState.STABLE, membershipManager.state());
clearInvocations(membershipManager, subscriptionState);
@@ -474,9 +489,7 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
// Pending assignment that was discovered in metadata should be ready to reconcile in the
// next reconciliation loop.
- Set topic2Assignment = new HashSet<>(Arrays.asList(
- new TopicIdPartition(topicId2, new TopicPartition(topic2, 1)),
- new TopicIdPartition(topicId2, new TopicPartition(topic2, 2))));
+ Set topic2Assignment = topicIdPartitionsSet(topicId2, topic2, 1, 2);
assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile());
}
@@ -500,8 +513,7 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.emptyList(), true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
@@ -510,7 +522,7 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
new TopicIdPartition(topicId, new TopicPartition(topicName, 1)));
verifyReconciliationTriggeredAndCompleted(membershipManager, assignedPartitions);
- assertTrue(membershipManager.currentAssignment().size() == 1);
+ assertEquals(1, membershipManager.currentAssignment().size());
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
}
@@ -655,7 +667,7 @@ public void testUpdateStateFailsOnResponsesWithErrors() {
// Updating state with a heartbeat response containing errors cannot be performed and
// should fail.
ConsumerGroupHeartbeatResponse unknownMemberResponse =
- createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
+ createConsumerGroupHeartbeatResponseWithError();
assertThrows(IllegalArgumentException.class,
() -> membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data()));
}
@@ -671,13 +683,13 @@ public void testUpdateStateFailsOnResponsesWithErrors() {
public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(false);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// When the ack is sent nothing should change. Member still has nothing to reconcile,
// only topics waiting for metadata.
membershipManager.onHeartbeatRequestSent();
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// New target assignment received while there is another one waiting to be resolved
// and reconciled. This assignment does not include the previous one that is waiting
@@ -708,13 +720,13 @@ public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(false);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// When the ack is sent nothing should change. Member still has nothing to reconcile,
// only topics waiting for metadata.
membershipManager.onHeartbeatRequestSent();
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// New target assignment received while there is another one waiting to be resolved
// and reconciled. This assignment does not include the previous one that is waiting
@@ -732,7 +744,7 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
public void testNewAssignmentNotInMetadataReplacesPreviousOneWaitingOnMetadata() {
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(false);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// New target assignment (not found in metadata) received while there is another one
// waiting to be resolved and reconciled. This assignment does not include the previous
@@ -742,7 +754,7 @@ public void testNewAssignmentNotInMetadataReplacesPreviousOneWaitingOnMetadata()
when(metadata.topicNames()).thenReturn(Collections.emptyMap());
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
assertEquals(topicId, membershipManager.topicsWaitingForMetadata().iterator().next());
}
@@ -758,7 +770,7 @@ public void testUnresolvedTargetAssignmentIsReconciledWhenMetadataReceived() {
Uuid topicId = Uuid.randomUuid();
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+ assertFalse(membershipManager.topicsWaitingForMetadata().isEmpty());
// Metadata update received, including the missing topic name.
String topicName = "topic1";
@@ -789,7 +801,7 @@ public void testMemberKeepsUnresolvedAssignmentWaitingForMetadataUntilResolved()
.setTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topic1)
- .setPartitions(Arrays.asList(0)),
+ .setPartitions(Collections.singletonList(0)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topic2)
.setPartitions(Arrays.asList(1, 3))
@@ -822,15 +834,11 @@ public void testReconcileNewPartitionsAssignedWhenNoPartitionOwned() {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.emptyList(), true);
-
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
- List assignedPartitions = Arrays.asList(
- new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
- new TopicIdPartition(topicId, new TopicPartition(topicName, 1)));
+ List assignedPartitions = topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager, assignedPartitions);
}
@@ -841,15 +849,14 @@ public void testReconcileNewPartitionsAssignedWhenOtherPartitionsOwned() {
TopicIdPartition ownedPartition = new TopicIdPartition(topicId, new TopicPartition(topicName, 0));
MembershipManagerImpl membershipManager = createMemberInStableState();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.singletonList(ownedPartition), true);
-
+ Collections.singletonList(ownedPartition));
// New assignment received, adding partitions 1 and 2 to the previously owned partition 0.
receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
- List assignedPartitions = Arrays.asList(ownedPartition,
- new TopicIdPartition(topicId, new TopicPartition("topic1", 1)),
- new TopicIdPartition(topicId, new TopicPartition("topic1", 2)));
+ List assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(ownedPartition);
+ assignedPartitions.addAll(topicIdPartitions(topicId, topicName, 1, 2));
verifyReconciliationTriggeredAndCompleted(membershipManager, assignedPartitions);
}
@@ -861,10 +868,8 @@ public void testReconciliationSkippedWhenSameAssignmentReceived() {
String topicName = "topic1";
// Receive assignment different from what the member owns - should reconcile
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList(), true);
- List expectedAssignmentReconciled = Arrays.asList(
- new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
- new TopicIdPartition(topicId, new TopicPartition(topicName, 1)));
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
+ List expectedAssignmentReconciled = topicIdPartitions(topicId, topicName, 0, 1);
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager, expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -874,7 +879,7 @@ public void testReconciliationSkippedWhenSameAssignmentReceived() {
assertEquals(MemberState.STABLE, membershipManager.state());
// Receive same assignment again - should not trigger reconciliation
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, expectedAssignmentReconciled, true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, expectedAssignmentReconciled);
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
// Verify new reconciliation was not triggered
verify(membershipManager, never()).markReconciliationInProgress();
@@ -887,7 +892,7 @@ public void testReconciliationSkippedWhenSameAssignmentReceived() {
@Test
public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
MembershipManagerImpl membershipManager = createMemberInStableState();
- mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1", 0);
+ mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
mockRevocationNoCallbacks(false);
@@ -899,7 +904,7 @@ public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
@Test
public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks() {
MembershipManagerImpl membershipManager = createMemberInStableState();
- mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1", 0);
+ mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
CompletableFuture commitResult = mockRevocationNoCallbacks(true);
@@ -920,7 +925,7 @@ public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks()
@Test
public void testReconcilePartitionsRevokedWithFailedAutoCommitCompletesRevocationAnyway() {
MembershipManagerImpl membershipManager = createMemberInStableState();
- mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1", 0);
+ mockOwnedPartition(membershipManager, Uuid.randomUuid(), "topic1");
CompletableFuture commitResult = mockRevocationNoCallbacks(true);
@@ -946,18 +951,15 @@ public void testReconcileNewPartitionsAssignedAndRevoked() {
new TopicPartition(topicName, 0));
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.singletonList(ownedPartition), true);
+ Collections.singletonList(ownedPartition));
mockRevocationNoCallbacks(false);
-
// New assignment received, revoking partition 0, and assigning new partitions 1 and 2.
receiveAssignment(topicId, Arrays.asList(1, 2), membershipManager);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
- Map> assignedPartitions = Collections.singletonMap(topicId,
- new TreeSet<>(Arrays.asList(1, 2)));
- assertEquals(assignedPartitions, membershipManager.currentAssignment());
+ assertEquals(topicIdPartitionsMap(topicId, 1, 2), membershipManager.currentAssignment());
assertFalse(membershipManager.reconciliationInProgress());
verify(subscriptionState).assignFromSubscribed(anyCollection());
@@ -986,9 +988,7 @@ public void testMetadataUpdatesReconcilesUnresolvedAssignments() {
// When metadata is updated, the member should re-trigger reconciliation
membershipManager.onUpdate(null);
- List expectedAssignmentReconciled = Arrays.asList(
- new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
- new TopicIdPartition(topicId, new TopicPartition(topicName, 1)));
+ List expectedAssignmentReconciled = topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager, expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty());
@@ -1027,8 +1027,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
String topicName = "topic1";
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.emptyList(), true);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
// Member received assignment to reconcile;
@@ -1058,8 +1057,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
verify(metadata, never()).requestUpdate(anyBoolean());
- List remainingAssignment = Collections.singletonList(
- new TopicIdPartition(topicId, new TopicPartition(topicName, 1)));
+ List remainingAssignment = topicIdPartitions(topicId, topicName, 1);
testRevocationCompleted(membershipManager, remainingAssignment);
}
@@ -1073,6 +1071,315 @@ public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+ when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+ when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+ when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+ when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitionsMap(topicId, 0, 1), membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCount());
+ assertEquals(1, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+
+ // Step 5: receive an empty assignment, which means we should call revoke
+ when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName, 0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCount());
+ assertEquals(2, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition(membershipManager, topicId, topicName);
+ CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
+
+ when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(topicIdPartitionsMap(topicId, 0), membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+ mockOwnedPartition(membershipManager, topicId, topicName);
+ CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
+
+ when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(topicIdPartitionsMap(topicId, 0), membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCount());
+ assertEquals(1, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+ mockOwnedPartition(membershipManager, topicId, topicName);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+ mockOwnedPartition(membershipManager, topicId, topicName);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test")));
+ }
+
+ private void testOnPartitionsLost(Optional lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
+
+ when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptyMap(), membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(0, listener.lostCount());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCount());
+ assertEquals(0, listener.assignedCount());
+ assertEquals(1, listener.lostCount());
+ }
+
+ private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet topicPartitions(String topicName, int... partitions) {
+ SortedSet topicPartitions = new TreeSet<>(new Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet topicIdPartitionsSet(Uuid topicId, String topicName, int... partitions) {
+ SortedSet topicIdPartitions = new TreeSet<>(new Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private List topicIdPartitions(Uuid topicId, String topicName, int... partitions) {
+ return new ArrayList<>(topicIdPartitionsSet(topicId, topicName, partitions));
+ }
+
+ private Map> topicIdPartitionsMap(Uuid topicId, int... partitions) {
+ SortedSet topicIdPartitions = new TreeSet<>();
+
+ for (int partition : partitions)
+ topicIdPartitions.add(partition);
+
+ return Collections.singletonMap(topicId, topicIdPartitions);
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName expectedMethodName,
+ SortedSet expectedPartitions) {
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent = (ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
+ membershipManager.consumerRebalanceListenerCallbackCompleted(invokedEvent);
+ }
+
private void testFenceIsNoOp(MembershipManagerImpl membershipManager,
MockRebalanceListener rebalanceListener) {
assertNotEquals(0, membershipManager.memberEpoch());
@@ -1100,7 +1407,7 @@ private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment(
Uuid topicId, String topicName, List partitions) {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName,
- Collections.emptyList(), true);
+ Collections.emptyList());
receiveAssignment(topicId, partitions, membershipManager);
@@ -1164,7 +1471,7 @@ private void verifyReconciliationTriggeredAndCompleted(MembershipManagerImpl mem
}
private List buildTopicPartitions(List topicIdPartitions) {
- return topicIdPartitions.stream().map(topicIdPartition -> topicIdPartition.topicPartition()).collect(Collectors.toList());
+ return topicIdPartitions.stream().map(TopicIdPartition::topicPartition).collect(Collectors.toList());
}
private void mockAckSent(MembershipManagerImpl membershipManager) {
@@ -1229,13 +1536,10 @@ private Map> assignmentByTopicId(List
private void mockOwnedPartitionAndAssignmentReceived(MembershipManagerImpl membershipManager,
Uuid topicId,
String topicName,
- List previouslyOwned,
- boolean mockMetadata) {
+ List previouslyOwned) {
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(previouslyOwned));
membershipManager.updateCurrentAssignment(new HashSet<>(previouslyOwned));
- if (mockMetadata) {
- when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
- }
+ when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
}
@@ -1246,8 +1550,8 @@ private Set getTopicPartitions(List topicIdPar
.collect(Collectors.toSet());
}
- private void mockOwnedPartition(MembershipManagerImpl membershipManager,
- Uuid topicId, String topic, int partition) {
+ private void mockOwnedPartition(MembershipManagerImpl membershipManager, Uuid topicId, String topic) {
+ int partition = 0;
TopicPartition previouslyOwned = new TopicPartition(topic, partition);
membershipManager.updateCurrentAssignment(
Collections.singleton(new TopicIdPartition(topicId, new TopicPartition(topic, partition))));
@@ -1409,9 +1713,9 @@ private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithB
.setAssignment(assignment));
}
- private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error) {
+ private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError() {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
- .setErrorCode(error.code())
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(5));
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index eb2be928c42b3..b89b674d3cd1a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -78,9 +78,8 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
assertNotEquals(0, BaseConsumerTest.updateConsumerCount.get())
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c0a249b4754d8..839f6d67fb16f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -169,7 +169,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
startingTimestamp = startingTimestamp)
}
- // ConsumerRebalanceListener temporarily not supported for the consumer group protocol
+ // TODO: enable this test for the consumer group protocol when KAFKA-16008 has been fixed.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = {
@@ -196,7 +196,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(1, listener.callsToRevoked)
}
- // ConsumerRebalanceListener temporarily not supported for the consumer group protocol
+ // TODO: enable this test for the consumer group protocol when KAFKA-16009 has been fixed.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: String): Unit = {
@@ -238,9 +238,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertTrue(commitCompleted)
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsDelayInAssignment(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
@@ -264,9 +263,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
ensureNoRebalance(consumer, listener)
}
- // Consumer group protocol temporarily does not commit offsets on consumer close
+ // TODO: enable this test for the consumer group protocol when support for committing offsets on close is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer = createConsumer()
@@ -289,7 +288,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
}
- // Consumer group protocol temporarily does not commit offsets on consumer close
+ // TODO: enable this test for the consumer group protocol when support for committing offsets on close is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String): Unit = {
@@ -351,7 +350,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* metadata refresh the consumer becomes subscribed to this new topic and all partitions
* of that topic are assigned to it.
*/
- // Pattern subscriptions temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testPatternSubscription(quorum: String, groupProtocol: String): Unit = {
@@ -410,7 +409,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* The metadata refresh interval is intentionally increased to a large enough value to guarantee
* that it is the subscription call that triggers a metadata refresh, and not the timeout.
*/
- // Pattern subscriptions temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testSubsequentPatternSubscription(quorum: String, groupProtocol: String): Unit = {
@@ -463,7 +462,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* When consumer unsubscribes from all its subscriptions, it is expected that its
* assignments are cleared right away.
*/
- // Pattern subscriptions temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when support for pattern subscriptions is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testPatternUnsubscription(quorum: String, groupProtocol: String): Unit = {
@@ -548,9 +547,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, expandedAssignment)
}
- // Consumer group protocol temporarily does not properly handle assignment change
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testShrinkingTopicSubscriptions(quorum: String, groupProtocol: String): Unit = {
val otherTopic = "other"
createTopic(otherTopic, 2, brokerCount)
@@ -1266,21 +1264,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when KAFKA-16010 has been fixed.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testMultiConsumerSessionTimeoutOnStopPolling(quorum: String, groupProtocol: String): Unit = {
runMultiConsumerSessionTimeoutTest(false)
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when KAFKA-16011 has been fixed.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testMultiConsumerSessionTimeoutOnClose(quorum: String, groupProtocol: String): Unit = {
runMultiConsumerSessionTimeoutTest(true)
}
- // Consumer interceptors temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when consumer interceptors are supported
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testInterceptors(quorum: String, groupProtocol: String): Unit = {
@@ -1341,7 +1339,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
MockProducerInterceptor.resetCounters()
}
- // Consumer interceptors temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when consumer interceptors are supported
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testAutoCommitIntercept(quorum: String, groupProtocol: String): Unit = {
@@ -1393,7 +1391,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
MockConsumerInterceptor.resetCounters()
}
- // Consumer interceptors temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when consumer interceptors are supported
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testInterceptorsWithWrongKeyValue(quorum: String, groupProtocol: String): Unit = {
@@ -1489,9 +1487,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(2, topics.get(topic3).size)
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testUnsubscribeTopic(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
@@ -1557,7 +1554,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
+ // TODO: enable this test for the consumer group protocol when auto-commit support is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testAutoCommitOnRebalance(quorum: String, groupProtocol: String): Unit = {
@@ -1598,9 +1595,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPerPartitionLeadMetricsCleanUpWithSubscribe(quorum: String, groupProtocol: String): Unit = {
val numMessages = 1000
val topic2 = "topic2"
@@ -1639,9 +1635,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
}
- // ConsumerRebalanceListener temporarily not supported for consumer group protocol
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPerPartitionLagMetricsCleanUpWithSubscribe(quorum: String, groupProtocol: String): Unit = {
val numMessages = 1000
val topic2 = "topic2"
@@ -2100,7 +2095,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = consumer1Config))
}
- // Static membership temporarily not supported in consumer group protocol
+ // TODO: enable this test for the consumer group protocol when static membership is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(quorum:String, groupProtocol: String): Unit = {