Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,17 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram
*/
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);
/**
* Describes finalized as well as supported features.
* <p>
* This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
* See the overload for more details.
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
default DescribeFeaturesResult describeFeatures() {
return describeFeatures(new DescribeFeaturesOptions());
}

/**
* Describes finalized as well as supported features. By default, the request is issued to any
Expand All @@ -1320,9 +1331,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
* @param options the options to use
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Expand Down Expand Up @@ -1367,10 +1378,9 @@ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredenti
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.

* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears we forgot to add an overload for updateFeatures. @kowshik @junrao WDYT? Is it worth adding consistency for now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finding this. Yes, we can add an overload w/o options for consistency. It would be useful to update the original KIP and the discussion thread with the change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4410,6 +4410,10 @@ public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> feat

final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be empty.");
}
updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
}

Expand All @@ -4424,10 +4428,6 @@ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
final FeatureUpdate update = entry.getValue();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be null or empty.");
}

final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
new UpdateFeaturesRequestData.FeatureUpdateKey();
requestItem.setFeature(feature);
Expand Down Expand Up @@ -4471,7 +4471,8 @@ void handleResponse(AbstractResponse abstractResponse) {
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) {
entry.getValue().completeExceptionally(topLevelError.exception());
final String errorMsg = response.data().errorMessage();
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4062,19 +4062,12 @@ public void testUpdateFeaturesShouldFailRequestForEmptyUpdates() {
@Test
public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
final UpdateFeaturesResult result = env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions());

final Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final Throwable cause = assertThrows(ExecutionException.class, () -> entry.getValue().get());
assertEquals(KafkaException.class, cause.getCause().getClass());
}

final KafkaFuture<Void> future = result.all();
final Throwable cause = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(KafkaException.class, cause.getCause().getClass());
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions()));
}
}

Expand Down