> offsets, Exception exception);
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 76bfe7e91a149..7ec147e9e3ced 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -198,10 +198,7 @@ public class ConsumerConfig extends AbstractConfig {
* fetch.max.wait.ms
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
- private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " +
- "answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " +
- "fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " +
- "time, please refer to 'remote.fetch.max.wait.ms' broker config";
+ private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
/** metadata.max.age.ms */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 8884c0393d608..2f43b603fc8ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -152,7 +152,7 @@ public interface ConsumerRebalanceListener {
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
* without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
- * and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} callback
+ * and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} (Collection)} callback
* will be triggered by the consumer then.
*
* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
deleted file mode 100644
index 2fc721a84ff7f..0000000000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ /dev/null
@@ -1,700 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate;
-import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegateCreator;
-import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.AuthorizationException;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.kafka.common.utils.Utils.propsToMap;
-
-/**
- * A client that consumes records from a Kafka cluster using a share group.
- *
- * This is an early access feature introduced by KIP-932. It is not suitable for production use until it is
- * fully implemented and released.
- *
- *
Cross-Version Compatibility
- * This client can communicate with brokers that are version 4.0.0 or newer. You will receive an
- * {@link org.apache.kafka.common.errors.UnsupportedVersionException} when invoking an API that is not
- * available on the running broker version.
- *
- *
- * Kafka uses the concept of share groups to allow a pool of consumers to cooperate on the work of
- * consuming and processing records. All consumer instances sharing the same {@code group.id} will be part of
- * the same share group.
- *
- * Each consumer in a group can dynamically set the list of topics it wants to subscribe to using the
- * {@link #subscribe(Collection)} method. Kafka will deliver each message in the subscribed topics to one
- * consumer in the share group. Unlike consumer groups, share groups balance the partitions between all
- * members of the share group permitting multiple consumers to consume from the same partitions. This gives
- * more flexible sharing of records than a consumer group, at the expense of record ordering.
- *
- * Membership in a share group is maintained dynamically: if a consumer fails, the partitions assigned to
- * it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group,
- * the partition assignment is re-evaluated and partitions can be moved from existing consumers to the new one.
- * This is known as rebalancing the group and is discussed in more detail below.
- * Group rebalancing is also used when new partitions are added to one of the subscribed topics. The group will
- * automatically detect the new partitions through periodic metadata refreshes and assign them to the members of the group.
- *
- * Conceptually, you can think of a share group as a single logical subscriber made up of multiple consumers.
- * In fact, in other messaging systems, a share group is roughly equivalent to a durable shared subscription.
- * You can have multiple share groups and consumer groups independently consuming from the same topics.
- *
- *
- * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(Duration)} is
- * invoked. This method is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
- * will stay in the group and continue to receive records from the partitions it was assigned. Under the covers,
- * the consumer sends periodic heartbeats to the broker. If the consumer crashes or is unable to send heartbeats for
- * the duration of the share group's session time-out, then the consumer will be considered dead and its partitions
- * will be reassigned.
- *
- * It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats
- * in the background, but no progress is being made. To prevent the consumer from holding onto its partitions
- * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms} setting.
- * If you don't call poll at least as frequently as this, the client will proactively leave the share group.
- * So to stay in the group, you must continue to call poll.
- *
- *
Record Delivery and Acknowledgement
- * When a consumer in a share-group fetches records using {@link #poll(Duration)}, it receives available records from any
- * of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
- * time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
- * the lock duration is 30 seconds, but it can also be controlled using the group {@code group.share.record.lock.duration.ms}
- * configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and
- * then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
- * the following ways:
- *
- * - The consumer can acknowledge successful processing of the record
- * - The consumer can release the record, which makes the record available for another delivery attempt
- * - The consumer can reject the record, which indicates that the record is unprocessable and does not make
- * the record available for another delivery attempt
- * - The consumer can do nothing, in which case the lock is automatically released when the lock duration has elapsed
- *
- * The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit
- * is reached, fetching records will temporarily yield no further records until the number of acquired records reduces,
- * as naturally happens when the locks time out. This limit is controlled by the broker configuration property
- * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
- * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
- *
- * The consumer can choose to use implicit or explicit acknowledgement of the records it processes.
- *
If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
- * it is using explicit acknowledgement. In this case:
- *
- * - The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka.
- * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
- * in response to a future poll.
- * - The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to
- * Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement.
- * If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
- * in response to a future poll.
- * - The application calls {@link #close()} which attempts to commit any pending acknowledgements and
- * releases any remaining acquired records.
- *
- * If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
- * it is using implicit acknowledgement. In this case:
- *
- * - The application calls {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of
- * the delivered records as processed successfully and commits the acknowledgements to Kafka.
- * - The application calls {@link #poll(Duration)} without committing, which also implicitly acknowledges all of
- * the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is
- * thrown by a failure to commit the acknowledgements.
- * - The application calls {@link #close()} which releases any acquired records without acknowledgement.
- *
- *
- * The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition
- * are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records
- * in a batch are performed atomically. This makes error handling significantly more straightforward because there can be
- * one error code per partition.
- *
- *
Usage Examples
- * The share consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
- * demonstrate how to use them.
- *
- * Acknowledging a batch of records (implicit acknowledgement)
- * This example demonstrates implicit acknowledgement using {@link #poll(Duration)} to acknowledge the records which
- * were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and
- * acknowledged synchronously with Kafka as the consumer fetches more records.
- *
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers", "localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
- * while (true) {
- * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
- * System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- * doProcessing(record);
- * }
- * }
- *
- *
- * Alternatively, you can use {@link #commitSync()} or {@link #commitAsync()} to commit the acknowledgements, but this is
- * slightly less efficient because there is an additional request sent to Kafka.
- *
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers", "localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
- * while (true) {
- * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
- * System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- * doProcessing(record);
- * }
- * consumer.commitSync();
- * }
- *
- *
- * Per-record acknowledgement (explicit acknowledgement)
- * This example demonstrates using different acknowledgement types depending on the outcome of processing the records.
- *
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers", "localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
- * while (true) {
- * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
- * try {
- * doProcessing(record);
- * consumer.acknowledge(record, AcknowledgeType.ACCEPT);
- * } catch (Exception e) {
- * consumer.acknowledge(record, AcknowledgeType.REJECT);
- * }
- * }
- * consumer.commitSync();
- * }
- *
- *
- * Each record processed is separately acknowledged using a call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}.
- * The {@link AcknowledgeType} argument indicates whether the record was processed successfully or not. In this case,
- * the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error
- * such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing
- * attempt, {@link AcknowledgeType#RELEASE} is more appropriate because the record remains eligible for further delivery attempts.
- *
- * The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} are simply updating local information in the consumer.
- * It is only once {@link #commitSync()} is called that the acknowledgements are committed by sending the new state
- * information to Kafka.
- *
- *
Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)
- * This example demonstrates ending processing of a batch of records on the first error.
- *
- * Properties props = new Properties();
- * props.setProperty("bootstrap.servers", "localhost:9092");
- * props.setProperty("group.id", "test");
- * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
- * consumer.subscribe(Arrays.asList("foo"));
- * while (true) {
- * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- * for (ConsumerRecord<String, String> record : records) {
- * try {
- * doProcessing(record);
- * consumer.acknowledge(record, AcknowledgeType.ACCEPT);
- * } catch (Exception e) {
- * consumer.acknowledge(record, AcknowledgeType.REJECT);
- * break;
- * }
- * }
- * consumer.commitSync();
- * }
- *
- * There are the following cases in this example:
- *
- * - The batch contains no records, in which case the application just polls again. The call to {@link #commitSync()}
- * just does nothing because the batch was empty.
- * - All of the records in the batch are processed successfully. The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}
- * specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch as successfully processed.
- * - One of the records encounters an exception. The call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} specifying
- * {@code AcknowledgeType.REJECT} rejects that record. Earlier records in the batch have already been marked as successfully
- * processed. The call to {@link #commitSync()} commits the acknowledgements, but the records after the failed record
- * remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.
- *
- *
- * Reading Transactional Records
- * The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}
- * configuration property. In a share group, the isolation level applies to the entire share group, not just individual
- * consumers.
- *
- * In read_uncommitted isolation level, the share group consumes all non-transactional and transactional
- * records. The consumption is bounded by the high-water mark.
- *
- * In read_committed isolation level (not yet supported), the share group only consumes non-transactional
- * records and committed transactional records. The set of records which are eligible to become in-flight records are
- * non-transactional records and committed transactional records only. The consumption is bounded by the last stable
- * offset, so an open transaction blocks the progress of the share group with read_committed isolation level.
- *
- *
- * The consumer is NOT thread-safe. It is the responsibility of the user to ensure that multithreaded access
- * is properly synchronized. Unsynchronized access will result in {@link java.util.ConcurrentModificationException}.
- *
- * The only exception to this rule is {@link #wakeup()} which can safely be used from an external thread to
- * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be
- * thrown from the thread blocking on the operation. This can be used to shut down the consumer from another thread.
- * The following snippet shows the typical pattern:
- *
- *
- * public class KafkaShareConsumerRunner implements Runnable {
- * private final AtomicBoolean closed = new AtomicBoolean(false);
- * private final KafkaShareConsumer consumer;
- *
- * public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
- * this.consumer = consumer;
- * }
- *
- * {@literal}@Override
- * public void run() {
- * try {
- * consumer.subscribe(Arrays.asList("topic"));
- * while (!closed.get()) {
- * ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
- * // Handle new records
- * }
- * } catch (WakeupException e) {
- * // Ignore exception if closing
- * if (!closed.get()) throw e;
- * } finally {
- * consumer.close();
- * }
- * }
- *
- * // Shutdown hook which can be called from a separate thread
- * public void shutdown() {
- * closed.set(true);
- * consumer.wakeup();
- * }
- * }
- *
- *
- * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
- *
- * closed.set(true);
- * consumer.wakeup();
- *
- *
- *
- * Note that while it is possible to use thread interrupts instead of {@link #wakeup()} to abort a blocking operation
- * (in which case, {@link InterruptException} will be raised), we discourage their use since they may cause a clean
- * shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using {@link #wakeup()}
- * is impossible, such as when a consumer thread is managed by code that is unaware of the Kafka client.
- *
- * We have intentionally avoided implementing a particular threading model for processing. Various options for
- * multithreaded processing are possible, of which the most straightforward is to dedicate a thread to each consumer.
- */
-@InterfaceStability.Evolving
-public class KafkaShareConsumer implements ShareConsumer {
-
- private final static ShareConsumerDelegateCreator CREATOR = new ShareConsumerDelegateCreator();
-
- private final ShareConsumerDelegate delegate;
-
- /**
- * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
- * are documented here. Values can be
- * either strings or objects of the appropriate type (for example a numeric configuration would accept either the
- * string "42" or the integer 42).
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}.
- *
- * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks.
- *
- * @param configs The consumer configs
- */
- public KafkaShareConsumer(Map configs) {
- this(configs, null, null);
- }
-
- /**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}.
- *
- * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks.
- *
- * @param properties The consumer configuration properties
- */
- public KafkaShareConsumer(Properties properties) {
- this(properties, null, null);
- }
-
- /**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration, and a
- * key and a value {@link Deserializer}.
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}.
- *
- * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks.
- *
- * @param properties The consumer configuration properties
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaShareConsumer(Properties properties,
- Deserializer keyDeserializer,
- Deserializer valueDeserializer) {
- this(propsToMap(properties), keyDeserializer, valueDeserializer);
- }
-
- /**
- * A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}.
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}.
- *
- * Note: after creating a {@code KafkaShareConsumer} you must always {@link #close()} it to avoid resource leaks.
- *
- * @param configs The consumer configs
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaShareConsumer(Map configs,
- Deserializer keyDeserializer,
- Deserializer valueDeserializer) {
- this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- keyDeserializer, valueDeserializer);
- }
-
- public KafkaShareConsumer(ConsumerConfig config,
- Deserializer keyDeserializer,
- Deserializer valueDeserializer) {
- delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
- }
-
- KafkaShareConsumer(final LogContext logContext,
- final String clientId,
- final String groupId,
- final ConsumerConfig config,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final Time time,
- final KafkaClient client,
- final SubscriptionState subscriptions,
- final ConsumerMetadata metadata) {
- delegate = CREATOR.create(
- logContext, clientId, groupId, config, keyDeserializer, valueDeserializer,
- time, client, subscriptions, metadata);
- }
-
- /**
- * Get the current subscription. Will return the same topics used in the most recent call to
- * {@link #subscribe(Collection)}, or an empty set if no such call has been made.
- *
- * @return The set of topics currently subscribed to
- */
- @Override
- public Set subscription() {
- return delegate.subscription();
- }
-
- /**
- * Subscribe to the given list of topics to get dynamically assigned partitions.
- * Topic subscriptions are not incremental. This list will replace the current
- * assignment, if there is one. If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
- *
- *
- * As part of group management, the coordinator will keep track of the list of consumers that belong to a particular
- * group and will trigger a rebalance operation if any one of the following events are triggered:
- *
- * - A member joins or leaves the share group
- *
- An existing member of the share group is shut down or fails
- *
- The number of partitions changes for any of the subscribed topics
- *
- A subscribed topic is created or deleted
- *
- *
- * @param topics The list of topics to subscribe to
- *
- * @throws IllegalArgumentException if topics is null or contains null or empty elements
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public void subscribe(Collection topics) {
- delegate.subscribe(topics);
- }
-
- /**
- * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}.
- *
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public void unsubscribe() {
- delegate.unsubscribe();
- }
-
- /**
- * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
- * subscribed to any topics before polling for data.
- *
- *
- * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
- * If the timeout expires, an empty record set will be returned.
- *
- * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
- *
- * @return map of topic to records since the last fetch for the subscribed list of topics
- *
- * @throws AuthenticationException if authentication fails. See the exception for more details
- * @throws AuthorizationException if caller lacks Read access to any of the subscribed
- * topics or to the share group. See the exception for more details
- * @throws IllegalArgumentException if the timeout value is negative
- * @throws IllegalStateException if the consumer is not subscribed to any topics
- * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
- * @throws InvalidTopicException if the current subscription contains any invalid
- * topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the calling thread is interrupted before or while this method is called
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public ConsumerRecords poll(Duration timeout) {
- return delegate.poll(timeout);
- }
-
- /**
- * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
- * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
- * {@link #poll(Duration)} call.
- *
- * @param record The record to acknowledge
- *
- * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
- * used implicit acknowledgement
- */
- @Override
- public void acknowledge(ConsumerRecord record) {
- delegate.acknowledge(record);
- }
-
- /**
- * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
- * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
- * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
- * explicit acknowledgement.
- *
- * @param record The record to acknowledge
- * @param type The acknowledgement type which indicates whether it was processed successfully
- *
- * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
- * used implicit acknowledgement
- */
- @Override
- public void acknowledge(ConsumerRecord record, AcknowledgeType type) {
- delegate.acknowledge(record, type);
- }
-
- /**
- * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
- * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
- * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
- * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
- *
- *
- * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
- * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms}
- * expires.
- *
- * @return A map of the results for each topic-partition for which delivery was acknowledged.
- * If the acknowledgement failed for a topic-partition, an exception is present.
- *
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the thread is interrupted while blocked
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public Map> commitSync() {
- return delegate.commitSync();
- }
-
- /**
- * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
- * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
- * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
- * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
-
- *
- * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
- * encountered (in which case it is thrown to the caller), or the timeout expires.
- *
- * @param timeout The maximum amount of time to await completion of the acknowledgement
- *
- * @return A map of the results for each topic-partition for which delivery was acknowledged.
- * If the acknowledgement failed for a topic-partition, an exception is present.
- *
- * @throws IllegalArgumentException if the {@code timeout} is negative
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the thread is interrupted while blocked
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public Map> commitSync(Duration timeout) {
- return delegate.commitSync(timeout);
- }
-
- /**
- * Commit the acknowledgements for the records returned. If the consumer is using explicit acknowledgement,
- * the acknowledgements to commit have been indicated using {@link #acknowledge(ConsumerRecord)} or
- * {@link #acknowledge(ConsumerRecord, AcknowledgeType)}. If the consumer is using implicit acknowledgement,
- * all the records returned by the latest call to {@link #poll(Duration)} are acknowledged.
- *
- * @throws KafkaException for any other unrecoverable errors
- */
- @Override
- public void commitAsync() {
- delegate.commitAsync();
- }
-
- /**
- * Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.
- *
- * @param callback The acknowledgement commit callback
- */
- @Override
- public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback) {
- delegate.setAcknowledgementCommitCallback(callback);
- }
-
- /**
- * Determines the client's unique client instance ID used for telemetry. This ID is unique to
- * this specific client instance and will not change after it is initially generated.
- * The ID is useful for correlating client operations with telemetry sent to the broker and
- * to its eventual monitoring destinations.
- *
- * If telemetry is enabled, this will first require a connection to the cluster to generate
- * the unique client instance ID. This method waits up to {@code timeout} for the consumer
- * client to complete the request.
- *
- * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
- * configuration option.
- *
- * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
- * The value must be non-negative. Specifying a timeout of zero means do not
- * wait for the initial request to complete if it hasn't already.
- *
- * @return The client's assigned instance id used for metrics collection.
- *
- * @throws IllegalArgumentException if the {@code timeout} is negative
- * @throws IllegalStateException if telemetry is not enabled
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the thread is interrupted while blocked
- * @throws KafkaException if an unexpected error occurs while trying to determine the client
- * instance ID, though this error does not necessarily imply the
- * consumer client is otherwise unusable
- */
- @Override
- public Uuid clientInstanceId(Duration timeout) {
- return delegate.clientInstanceId(timeout);
- }
-
- /**
- * Get the metrics kept by the consumer
- */
- @Override
- public Map metrics() {
- return delegate.metrics();
- }
-
- /**
- * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
- * This will commit acknowledgements if possible within the default timeout.
- * See {@link #close(Duration)} for details. Note that {@link #wakeup()} cannot be used to interrupt close.
- *
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the thread is interrupted before or while this method is called
- * @throws KafkaException for any other error during close
- */
- @Override
- public void close() {
- delegate.close();
- }
-
- /**
- * Tries to close the consumer cleanly within the specified timeout. This method waits up to
- * {@code timeout} for the consumer to complete acknowledgements and leave the group.
- * If the consumer is unable to complete acknowledgements and gracefully leave the group
- * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
- * used to interrupt close.
- *
- * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
- * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
- *
- * @throws IllegalArgumentException if the {@code timeout} is negative
- * @throws WakeupException if {@link #wakeup()} is called before or while this method is called
- * @throws InterruptException if the thread is interrupted before or while this method is called
- * @throws KafkaException for any other error during close
- */
- @Override
- public void close(Duration timeout) {
- delegate.close(timeout);
- }
-
- /**
- * Wake up the consumer. This method is thread-safe and is useful in particular to abort a long poll.
- * The thread which is blocking in an operation will throw {@link WakeupException}.
- * If no thread is blocking in a method which can throw {@link WakeupException},
- * the next call to such a method will raise it instead.
- */
- @Override
- public void wakeup() {
- delegate.wakeup();
- }
-
- // Functions below are for testing only
- String clientId() {
- return delegate.clientId();
- }
-
- Metrics metricsRegistry() {
- return delegate.metricsRegistry();
- }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 600f8bbd07ef4..27faa80c65421 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -253,7 +253,7 @@ public synchronized ConsumerRecords poll(final Duration timeout) {
}
}
- toClear.forEach(records::remove);
+ toClear.forEach(p -> this.records.remove(p));
return new ConsumerRecords<>(results);
}
@@ -263,7 +263,7 @@ public synchronized void addRecord(ConsumerRecord record) {
Set currentAssigned = this.subscriptions.assignedPartitions();
if (!currentAssigned.contains(tp))
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
- List> recs = records.computeIfAbsent(tp, k -> new ArrayList<>());
+ List> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
}
@@ -286,7 +286,8 @@ public synchronized void setOffsetsException(KafkaException exception) {
@Override
public synchronized void commitAsync(Map offsets, OffsetCommitCallback callback) {
ensureNotClosed();
- committed.putAll(offsets);
+ for (Map.Entry entry : offsets.entrySet())
+ committed.put(entry.getKey(), entry.getValue());
if (callback != null) {
callback.onComplete(offsets, null);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
deleted file mode 100644
index 212e2ae390d5f..0000000000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.LogContext;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
-
-/**
- * A mock of the {@link ShareConsumer} interface you can use for testing code that uses Kafka. This class is not
- * thread-safe .
- */
-public class MockShareConsumer implements ShareConsumer {
-
- private final SubscriptionState subscriptions;
- private final AtomicBoolean wakeup;
-
- private final Map>> records;
-
- private boolean closed;
- private Uuid clientInstanceId;
-
- public MockShareConsumer() {
- this.subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
- this.records = new HashMap<>();
- this.closed = false;
- this.wakeup = new AtomicBoolean(false);
- }
-
- @Override
- public synchronized Set subscription() {
- ensureNotClosed();
- return subscriptions.subscription();
- }
-
- @Override
- public synchronized void subscribe(Collection topics) {
- ensureNotClosed();
- subscriptions.subscribe(new HashSet<>(topics), Optional.empty());
- }
-
- @Override
- public synchronized void unsubscribe() {
- ensureNotClosed();
- subscriptions.unsubscribe();
- }
-
- @Override
- public synchronized ConsumerRecords poll(Duration timeout) {
- ensureNotClosed();
-
- final Map>> results = new HashMap<>();
- for (Map.Entry>> entry : records.entrySet()) {
- final List> recs = entry.getValue();
- for (final ConsumerRecord rec : recs) {
- results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
- }
- }
-
- records.clear();
- return new ConsumerRecords<>(results);
- }
-
- @Override
- public synchronized void acknowledge(ConsumerRecord record) {
- }
-
- @Override
- public synchronized void acknowledge(ConsumerRecord record, AcknowledgeType type) {
- }
-
- @Override
- public synchronized Map> commitSync() {
- return new HashMap<>();
- }
-
- @Override
- public synchronized Map> commitSync(Duration timeout) {
- return new HashMap<>();
- }
-
- @Override
- public synchronized void commitAsync() {
- }
-
- @Override
- public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback) {
- }
-
- public synchronized void setClientInstanceId(final Uuid clientInstanceId) {
- this.clientInstanceId = clientInstanceId;
- }
-
- @Override
- public synchronized Uuid clientInstanceId(Duration timeout) {
- if (clientInstanceId == null) {
- throw new UnsupportedOperationException("clientInstanceId not set");
- }
-
- return clientInstanceId;
- }
-
- @Override
- public synchronized Map metrics() {
- ensureNotClosed();
- return Collections.emptyMap();
- }
-
- @Override
- public synchronized void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
- }
-
- @Override
- public synchronized void close(Duration timeout) {
- closed = true;
- }
-
- @Override
- public synchronized void wakeup() {
- wakeup.set(true);
- }
-
- public synchronized void addRecord(ConsumerRecord record) {
- ensureNotClosed();
- TopicPartition tp = new TopicPartition(record.topic(), record.partition());
- if (!subscriptions.subscription().contains(record.topic()))
- throw new IllegalStateException("Cannot add records for a topics that is not subscribed by the consumer");
- List> recs = records.computeIfAbsent(tp, k -> new ArrayList<>());
- recs.add(record);
- }
-
- private void ensureNotClosed() {
- if (closed)
- throw new IllegalStateException("This consumer has already been closed.");
- }
-}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 6bc064006fe6f..e95d4f1efd7b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -170,7 +170,7 @@ private void assignRanges(TopicAssignmentState assignmentState,
private void assignWithRackMatching(Collection assignmentStates,
Map> assignment) {
- assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) ->
+ assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
if (coPartitionedStates.size() > 1)
assignCoPartitionedWithRackMatching(consumers, numPartitions, coPartitionedStates, assignment);
@@ -179,8 +179,8 @@ private void assignWithRackMatching(Collection assignmentS
if (state.needsRackAwareAssignment)
assignRanges(state, state::racksMatch, assignment);
}
- })
- );
+ });
+ });
}
private void assignCoPartitionedWithRackMatching(LinkedHashMap> consumers,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
deleted file mode 100644
index 8ac4198c70df3..0000000000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.io.Closeable;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * @see KafkaShareConsumer
- * @see MockShareConsumer
- */
-@InterfaceStability.Evolving
-public interface ShareConsumer extends Closeable {
-
- /**
- * @see KafkaShareConsumer#subscription()
- */
- Set subscription();
-
- /**
- * @see KafkaShareConsumer#subscribe(Collection)
- */
- void subscribe(Collection topics);
-
- /**
- * @see KafkaShareConsumer#unsubscribe()
- */
- void unsubscribe();
-
- /**
- * @see KafkaShareConsumer#poll(Duration)
- */
- ConsumerRecords poll(Duration timeout);
-
- /**
- * @see KafkaShareConsumer#acknowledge(ConsumerRecord)
- */
- void acknowledge(ConsumerRecord record);
-
- /**
- * @see KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType)
- */
- void acknowledge(ConsumerRecord record, AcknowledgeType type);
-
- /**
- * @see KafkaShareConsumer#commitSync()
- */
- Map> commitSync();
-
- /**
- * @see KafkaShareConsumer#commitSync(Duration)
- */
- Map> commitSync(Duration timeout);
-
- /**
- * @see KafkaShareConsumer#commitAsync()
- */
- void commitAsync();
-
- /**
- * @see KafkaShareConsumer#setAcknowledgementCommitCallback(AcknowledgementCommitCallback)
- */
- void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback);
-
- /**
- * See {@link KafkaShareConsumer#clientInstanceId(Duration)}}
- */
- Uuid clientInstanceId(Duration timeout);
-
- /**
- * @see KafkaShareConsumer#metrics()
- */
- Map metrics();
-
- /**
- * @see KafkaShareConsumer#close()
- */
- void close();
-
- /**
- * @see KafkaShareConsumer#close(Duration)
- */
- void close(Duration timeout);
-
- /**
- * @see KafkaShareConsumer#wakeup()
- */
- void wakeup();
-
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 0d3e4a256e2be..7e7350a5946e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -236,7 +236,8 @@ static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
- memberData.generation.ifPresent(integer -> struct.set(GENERATION_KEY_NAME, integer));
+ if (memberData.generation.isPresent())
+ struct.set(GENERATION_KEY_NAME, memberData.generation.get());
ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
buffer.flip();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e6de169f7b33f..6930cd02955d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -26,6 +26,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
@@ -234,12 +235,12 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
- private int metadataVersionSnapshot;
private final Metrics metrics;
private final long retryBackoffMs;
private final int defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
+ private final List assignors;
private final Optional clientTelemetryReporter;
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
@@ -312,7 +313,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
final List addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
- this.metadataVersionSnapshot = metadata.updateVersion();
FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
@@ -331,8 +331,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
apiVersions,
metrics,
fetchMetricsManager,
- clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
- backgroundEventHandler);
+ clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
@@ -374,6 +373,10 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
rebalanceListenerInvoker
);
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
+ this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
+ config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+ config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
+ );
// The FetchCollector is only used on the application thread.
this.fetchCollector = fetchCollectorFactory.build(logContext,
@@ -421,6 +424,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
ConsumerMetadata metadata,
long retryBackoffMs,
int defaultApiTimeoutMs,
+ List assignors,
String groupId,
boolean autoCommitEnabled) {
this.log = logContext.logger(getClass());
@@ -437,11 +441,11 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.metrics = metrics;
this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty()));
this.metadata = metadata;
- this.metadataVersionSnapshot = metadata.updateVersion();
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
+ this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
@@ -456,7 +460,8 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
Deserializer valueDeserializer,
KafkaClient client,
SubscriptionState subscriptions,
- ConsumerMetadata metadata) {
+ ConsumerMetadata metadata,
+ List assignors) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
@@ -467,10 +472,10 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.time = time;
this.metrics = new Metrics(time);
this.metadata = metadata;
- this.metadataVersionSnapshot = metadata.updateVersion();
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.assignors = assignors;
this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
@@ -505,9 +510,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
time,
config,
logContext,
- client,
- metadata,
- backgroundEventHandler
+ client
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
@@ -1475,11 +1478,12 @@ public void assign(Collection partitions) {
}
/**
- *
+ * TODO: remove this when we implement the KIP-848 protocol.
*
- * This function evaluates the regex that the consumer subscribed to
- * against the list of topic names from metadata, and updates
- * the list of topics in subscription state accordingly
+ *
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class?
*
* @param cluster Cluster from which we get the topics
*/
@@ -1489,7 +1493,7 @@ private void updatePatternSubscription(Cluster cluster) {
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
applicationEventHandler.add(new SubscriptionChangeEvent());
- this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
+ metadata.requestUpdateForNewTopics();
}
}
@@ -1683,6 +1687,12 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
}
}
+ private void throwIfNoAssignorsConfigured() {
+ if (assignors.isEmpty())
+ throw new IllegalStateException("Must configure at least one partition assigner class name to " +
+ ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
+ }
+
private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
@@ -1770,6 +1780,7 @@ private void subscribeInternal(Pattern pattern, Optional topics, Optional currentTopicPartitions = new HashSet<>();
@@ -1805,7 +1818,7 @@ private void subscribeInternal(Collection topics, Optional(topics), listener))
- this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
+ metadata.requestUpdateForNewTopics();
// Trigger subscribe event to effectively join the group if not already part of it,
// or just send the new subscription to the broker.
@@ -1984,12 +1997,9 @@ SubscriptionState subscriptions() {
}
private void maybeUpdateSubscriptionMetadata() {
- if (this.metadataVersionSnapshot < metadata.updateVersion()) {
- this.metadataVersionSnapshot = metadata.updateVersion();
- if (subscriptions.hasPatternSubscription()) {
- updatePatternSubscription(metadata.fetch());
- }
+ if (subscriptions.hasPatternSubscription()) {
+ updatePatternSubscription(metadata.fetch());
}
}
-}
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 000797dba0918..f6a11fe4b7d58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -69,7 +69,6 @@
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS;
public class CommitRequestManager implements RequestManager, MemberStateListener {
- private final Time time;
private final SubscriptionState subscriptions;
private final LogContext logContext;
private final Logger log;
@@ -134,7 +133,6 @@ public CommitRequestManager(
final OptionalDouble jitter,
final Metrics metrics) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
- this.time = time;
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.pendingRequests = new PendingRequests();
@@ -207,13 +205,6 @@ private static long findMinTime(final Collection extends RequestState> request
.orElse(Long.MAX_VALUE);
}
- private KafkaException maybeWrapAsTimeoutException(Throwable t) {
- if (t instanceof TimeoutException)
- return (TimeoutException) t;
- else
- return new TimeoutException(t);
- }
-
/**
* Generate a request to commit consumed offsets. Add the request to the queue of pending
* requests to be sent out on the next call to {@link #poll(long)}. If there are empty
@@ -254,7 +245,7 @@ public void maybeAutoCommitAsync() {
if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
OffsetCommitRequestState requestState = createOffsetCommitRequest(
subscriptions.allConsumed(),
- Long.MAX_VALUE);
+ Optional.empty());
CompletableFuture