Skip to content
51 changes: 51 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -1080,6 +1082,55 @@ ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPa
*/
MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions());
}

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);

/**
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.
*
* <p>This is a convenience method for {@link #listOffsets(Map, ListOffsetsOptions)}
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @return The ListOffsetsResult.
*/
default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
}

/**
* <p>List offset for the specified partitions. This operation enables to find
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.
*
* @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
* @param options The options to use when retrieving the offsets
* @return The ListOffsetsResult.
*/
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
Comment thread
mimaison marked this conversation as resolved.
Outdated

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterConsumerGroupOffsetsOptions extends AbstractOptions<AlterConsumerGroupOffsetsOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.KafkaFuture.BaseFunction;
import org.apache.kafka.common.KafkaFuture.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;

/**
* The result of the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterConsumerGroupOffsetsResult {

private final KafkaFuture<Map<TopicPartition, Errors>> future;

AlterConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
this.future = future;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
@Override
public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Alter offset for partition \"" + partition + "\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
}

}
});

return result;
}

/**
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. This is a little different from what we have in DeleteConsumerGroupOffsetsResult. I think it makes sense to check all the partition level errors. cc @dajac

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point but I am not sure what the best one is. The rational behind not looking at individual topic/partitions was that it allows to use all() to wait for the completion of the request and then check the individual results. In this case all() fails only if the whole group has failed.

To be more concrete, it allows to do the following:

DeleteConsumerGroupOffsetsResult result = ...;

try {
  // wait for the whole group, only raise when a group level or
  // transport level exception affection the whole request occurs
  result.all().get()

  // inspect individual topic/partition
  try {
    result.partitionResult(...).get()
  } catch (Exception e) {
    // handle partition exception
  }
} catch (Exception e) {
  // handle group level exception
}

I think that this facilitates the error handling. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point. I think the usual semantics of all is to only succeed if all individual operations have succeeded. It's sort of designed for lazy error handling I guess. If users care about the individual operations, they can check them individually. Otherwise they have a convenient way to check for any errors. Based on what I've seen, this tends to be the most frequent use. I think also part of the idea is to abstract away from the underlying requests. Some of the admin APIs result in multiple broker requests which makes exposing the full granularity of errors quite cumbersome.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made a pass on all XXXResult classes and I think the API semantics are a bit inconsistency in general: originally I thought we only need the all function if the result contains futures in the form of Map<..., KafkaFuture<...>> which potentially requires one trip for each nested future, and the all function is used as a lazy way to check that all entries have completed successfully. But some (e.g. RemoveMemberFromGroupResult in form of Map<MemberIdentity, KafkaFuture<Void>>) actually only requires one request too, so all futures would actually be always completed at the same time. For those cases we do not need an all function either.

But it seems like for results that only contain a KafkaFuture<Object> we also have a dummy all function, and many of their all semantics are different too.

Honestly I think not all results needs an all function, but it seems we are already a bit messy here..

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately the admin APIs have such a big surface area it's hard to maintain consistency. I think the original intent is what I described though.

return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
@Override
public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
}
});
}
}
Loading