Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
Expand Down Expand Up @@ -287,12 +286,26 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>());
this(
config,
keyDeserializer,
valueDeserializer,
Time.SYSTEM,
ApplicationEventHandler::new,
FetchCollector::new,
ConsumerMetadata::new,
new LinkedBlockingQueue<>()
);
}

// Visible for testing
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Time time,
final ApplicationEventHandlerFactory applicationEventHandlerFactory,
final FetchCollectorFactory<K, V> fetchCollectorFactory,
final ConsumerMetadataFactory metadataFactory,
final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
Expand All @@ -305,7 +318,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {

log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.time = time;
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
Expand All @@ -319,7 +332,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
interceptorList,
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);

Expand Down Expand Up @@ -360,7 +373,8 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
metadata,
applicationEventQueue,
requestManagersSupplier);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
applicationEventQueue,
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -391,7 +405,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig);

// The FetchCollector is only used on the application thread.
this.fetchCollector = new FetchCollector<>(logContext,
this.fetchCollector = fetchCollectorFactory.build(logContext,
metadata,
subscriptions,
fetchConfig,
Expand Down Expand Up @@ -420,49 +434,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
}

// Visible for testing
AsyncKafkaConsumer(LogContext logContext,
String clientId,
Deserializers<K, V> deserializers,
FetchBuffer fetchBuffer,
FetchCollector<K, V> fetchCollector,
ConsumerInterceptors<K, V> interceptors,
Time time,
ApplicationEventHandler applicationEventHandler,
BlockingQueue<BackgroundEvent> backgroundEventQueue,
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
Metrics metrics,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
this.fetchBuffer = fetchBuffer;
this.fetchCollector = fetchCollector;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.backgroundEventProcessor = new BackgroundEventProcessor(
logContext,
backgroundEventQueue,
applicationEventHandler,
rebalanceListenerInvoker
);
this.metrics = metrics;
this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty());
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
}

AsyncKafkaConsumer(LogContext logContext,
Time time,
ConsumerConfig config,
Expand Down Expand Up @@ -563,6 +534,47 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
);
}

// auxiliary interface for testing
interface ApplicationEventHandlerFactory {

ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier
);

}

// auxiliary interface for testing
interface FetchCollectorFactory<K, V> {

FetchCollector<K, V> build(
final LogContext logContext,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig fetchConfig,
final Deserializers<K, V> deserializers,
final FetchMetricsManager metricsManager,
final Time time
);

}

// auxiliary interface for testing
interface ConsumerMetadataFactory {

ConsumerMetadata build(
final ConsumerConfig config,
final SubscriptionState subscriptions,
final LogContext logContext,
final ClusterResourceListeners clusterResourceListeners
);

}

private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config,
final GroupRebalanceConfig groupRebalanceConfig) {
final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata(
Expand Down Expand Up @@ -1756,8 +1768,7 @@ private void maybeThrowFencedInstanceException() {
}
}

// Visible for testing
void maybeInvokeCommitCallbacks() {
private void maybeInvokeCommitCallbacks() {
if (callbacks() > 0) {
invoker.executeCallbacks();
}
Expand All @@ -1768,6 +1779,11 @@ int callbacks() {
return invoker.callbackQueue.size();
}

// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
}
Comment on lines +1782 to +1785
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I can add callback to the constructor that registers the SubscriptionState with the calling context (the test)
  • I can add another factory to inject our own instance of SubscriptionState


/**
* Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is
* achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the
Expand Down
Loading