diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6b8aedb6e4700..fa1d5873a2cbc 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -39,7 +39,9 @@
-
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e6986c8a378fb..bd135acc74b30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4822,12 +4822,12 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
- new AddRaftVoterRequestData().
- setClusterId(options.clusterId().orElse(null)).
- setTimeoutMs(timeoutMs).
- setVoterId(voterId) .
- setVoterDirectoryId(voterDirectoryId).
- setListeners(listeners));
+ new AddRaftVoterRequestData().
+ setClusterId(options.clusterId().orElse(null)).
+ setTimeoutMs(timeoutMs).
+ setVoterId(voterId) .
+ setVoterDirectoryId(voterDirectoryId).
+ setListeners(listeners));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
index 033daa098761d..ae4b9f71f6c4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java
@@ -44,7 +44,7 @@ public final class OffsetsForLeaderEpochUtils {
private static final Logger LOG = LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
- private OffsetsForLeaderEpochUtils(){}
+ private OffsetsForLeaderEpochUtils() {}
static AbstractRequest.Builder prepareRequest(
Map requestData) {
diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index ca2a76b4716a9..58241c51ea085 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -202,14 +202,14 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
EnumMap> denyPatterns =
new EnumMap<>(PatternType.class) {{
- put(PatternType.LITERAL, new HashSet<>());
- put(PatternType.PREFIXED, new HashSet<>());
- }};
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
EnumMap> allowPatterns =
new EnumMap<>(PatternType.class) {{
- put(PatternType.LITERAL, new HashSet<>());
- put(PatternType.PREFIXED, new HashSet<>());
- }};
+ put(PatternType.LITERAL, new HashSet<>());
+ put(PatternType.PREFIXED, new HashSet<>());
+ }};
boolean hasWildCardAllow = false;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cf5fbc938e3ee..ca70af6d114f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2061,9 +2061,9 @@ public void testDescribeBrokerAndLogConfigs() throws Exception {
new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(brokerResource.name()).setResourceType(brokerResource.type().id()).setErrorCode(Errors.NONE.code())
.setConfigs(emptyList()),
- new DescribeConfigsResponseData.DescribeConfigsResult()
- .setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
- .setConfigs(emptyList())))), env.cluster().nodeById(0));
+ new DescribeConfigsResponseData.DescribeConfigsResult()
+ .setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
+ .setConfigs(emptyList())))), env.cluster().nodeById(0));
Map> result = env.adminClient().describeConfigs(asList(
brokerResource,
brokerLoggerResource)).values();
@@ -2102,9 +2102,9 @@ public void testDescribeConfigsUnrequested() throws Exception {
new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(topic.name()).setResourceType(topic.type().id()).setErrorCode(Errors.NONE.code())
.setConfigs(emptyList()),
- new DescribeConfigsResponseData.DescribeConfigsResult()
- .setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
- .setConfigs(emptyList())))));
+ new DescribeConfigsResponseData.DescribeConfigsResult()
+ .setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
+ .setConfigs(emptyList())))));
Map> result = env.adminClient().describeConfigs(singletonList(
topic)).values();
assertEquals(new HashSet<>(singletonList(topic)), result.keySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index d9935fbdca6e8..e3bb56347a8ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -177,11 +177,11 @@ public void testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups(
Map offsetAndMetadataMapTwo =
Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
Map> expectedResult =
- new HashMap>() {{
- put(groupZero, offsetAndMetadataMapZero);
- put(groupOne, offsetAndMetadataMapOne);
- put(groupTwo, offsetAndMetadataMapTwo);
- }};
+ new HashMap<>() {{
+ put(groupZero, offsetAndMetadataMapZero);
+ put(groupOne, offsetAndMetadataMapOne);
+ put(groupTwo, offsetAndMetadataMapTwo);
+ }};
assertCompletedForMultipleGroups(
handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult);
@@ -304,11 +304,11 @@ private OffsetFetchResponse buildResponseWithPartitionErrorWithMultipleGroups(Er
responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
Map> responseData =
- new HashMap>() {{
- put(groupZero, responseDataZero);
- put(groupOne, responseDataOne);
- put(groupTwo, responseDataTwo);
- }};
+ new HashMap<>() {{
+ put(groupZero, responseDataZero);
+ put(groupOne, responseDataOne);
+ put(groupTwo, responseDataTwo);
+ }};
Map errorMap = errorMap(groups, Errors.NONE);
return new OffsetFetchResponse(0, errorMap, responseData);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 07808f29806a5..b73576757e86e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -298,8 +298,8 @@ public void serializeDeserializeConsumerProtocolSubscriptionAllVersions() {
if (version >= 1) {
assertEquals(
Set.of(
- new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
- new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
+ new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
+ new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
)),
Set.copyOf(parsedSubscription.ownedPartitions())
);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9ae80dc19baf3..6143daaa96717 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1815,10 +1815,10 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept
@ParameterizedTest
@EnumSource(names = {
- "UNKNOWN_TOPIC_OR_PARTITION",
- "REQUEST_TIMED_OUT",
- "COORDINATOR_LOAD_IN_PROGRESS",
- "CONCURRENT_TRANSACTIONS"
+ "UNKNOWN_TOPIC_OR_PARTITION",
+ "REQUEST_TIMED_OUT",
+ "COORDINATOR_LOAD_IN_PROGRESS",
+ "CONCURRENT_TRANSACTIONS"
})
public void testRetriableErrors(Errors error) {
// Ensure FindCoordinator retries.
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index b4c73d64d38c4..d955c7939ae8f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -907,8 +907,8 @@ public void testLowestPriorityChannel() throws Exception {
}
assertNotNull(selector.lowestPriorityChannel());
for (int i = conns - 1; i >= 0; i--) {
- if (i != 2)
- assertEquals("", blockingRequest(String.valueOf(i), ""));
+ if (i != 2)
+ assertEquals("", blockingRequest(String.valueOf(i), ""));
time.sleep(10);
}
assertEquals("2", selector.lowestPriorityChannel().id());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
index c85d26dac5756..d0ef79b4479e8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java
@@ -390,16 +390,16 @@ public void testUseDefaultLeaderEpochV0ToV7() {
.setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(throttleTimeMs)
.setTopics(Collections.singletonList(
- new OffsetFetchResponseTopic()
- .setName(topicOne)
- .setPartitions(Collections.singletonList(
- new OffsetFetchResponsePartition()
- .setPartitionIndex(partitionOne)
- .setCommittedOffset(offset)
- .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
- .setMetadata(metadata))
- ))
+ new OffsetFetchResponseTopic()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new OffsetFetchResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setCommittedOffset(offset)
+ .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+ .setMetadata(metadata))
+ ))
);
assertEquals(expectedData, response.data());
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cb6a2458261ea..0d32fd442af88 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2187,9 +2187,9 @@ private JoinGroupRequest createJoinGroupRequest(short version) {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(
- new JoinGroupRequestData.JoinGroupRequestProtocol()
- .setName("consumer-range")
- .setMetadata(new byte[0])).iterator()
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("consumer-range")
+ .setMetadata(new byte[0])).iterator()
);
JoinGroupRequestData data = new JoinGroupRequestData()
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index a410adde94458..588baf8c09047 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -167,10 +167,10 @@ public void testAclFiltering() {
new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
assertFalse(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
+ new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
assertTrue(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
}
@Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 01f847d94f667..aa715667d24c4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -1196,7 +1196,7 @@ protected void addConfigKey(Map keys, String name,
keys.putAll(configDef.configKeys());
}
- protected void addValue(List values, String name, String value, String...errors) {
+ protected void addValue(List values, String name, String value, String... errors) {
values.add(new ConfigValue(name, value, new ArrayList<>(), Arrays.asList(errors)));
}
@@ -1211,7 +1211,7 @@ protected void assertNoInfoKey(ConfigInfos infos, String name) {
assertNull(info.configKey());
}
- protected void assertInfoValue(ConfigInfos infos, String name, String value, String...errors) {
+ protected void assertInfoValue(ConfigInfos infos, String name, String value, String... errors) {
ConfigValueInfo info = findInfo(infos, name).configValue();
assertEquals(name, info.name());
assertEquals(value, info.value());
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index fe0a99c4abe72..566bdfab23832 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -35,9 +35,8 @@ public class HasHeaderKey> implements Predicate, V
private static final String NAME_CONFIG = "name";
public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
- "The header name.");
+ .define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "The header name.");
private String name;
@Override
diff --git a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
index 4b8178032e06f..d4b87ef8ed572 100644
--- a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
+++ b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java
@@ -50,11 +50,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(serverProperties = {
- @ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
- @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
- @ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
+ @ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
})
@ExtendWith(ClusterTestExtensions.class)
public class AdminFenceProducersTest {
diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
index 56d466bd27b5c..f701d3f38450e 100644
--- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java
+++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java
@@ -75,7 +75,7 @@ public class ClientTelemetryTest {
types = Type.KRAFT,
brokers = 3,
serverProperties = {
- @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
+ @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
})
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map configs = new HashMap<>();
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 70b1faee43b66..10c24111e4757 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -420,7 +420,7 @@ public void testParseConfigsToBeAddedForAddConfigFile() throws IOException {
assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested"));
}
- public void testExpectedEntityTypeNames(List expectedTypes, List expectedNames, List connectOpts, String...args) {
+ public void testExpectedEntityTypeNames(List expectedTypes, List expectedNames, List connectOpts, String... args) {
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args)));
createOpts.checkArgs();
assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes));
@@ -1434,7 +1434,7 @@ public static List concat(List... lists) {
}
@SafeVarargs
- public static Map concat(Map...maps) {
+ public static Map concat(Map... maps) {
Map res = new HashMap<>();
Stream.of(maps)
.map(Map::entrySet)
diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
index 55db18893f8f7..697dda07363a7 100644
--- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
+++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
@@ -62,10 +62,10 @@ public ConfigCommandResult(String stdout, OptionalInt exitStatus) {
}
}
- private ConfigCommandResult runConfigCommandViaBroker(String...args) {
+ private ConfigCommandResult runConfigCommandViaBroker(String... args) {
AtomicReference exitStatus = new AtomicReference<>(OptionalInt.empty());
Exit.setExitProcedure((status, __) -> {
- exitStatus.set(OptionalInt.of((Integer) status));
+ exitStatus.set(OptionalInt.of(status));
throw new RuntimeException();
});
diff --git a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
index 1462e1c7f64a7..75cd070b93d30 100644
--- a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
+++ b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
@@ -311,16 +311,16 @@ private static List> translatePartitionInfoToNodeIdList(List
CVE-2023-35116
-
-
- CVE-2020-8908
- CVE-2023-2976
-
deserializer() {
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
@JsonSubTypes({
- @JsonSubTypes.Type(value = PageView.class, name = "pv"),
- @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
- @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
- @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
- @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
- })
+ @JsonSubTypes.Type(value = PageView.class, name = "pv"),
+ @JsonSubTypes.Type(value = UserProfile.class, name = "up"),
+ @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
+ @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
+ @JsonSubTypes.Type(value = RegionCount.class, name = "rc")
+ })
public interface JSONSerdeCompatible {
}
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index df4d981fa7175..ce05eabb56606 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -97,9 +97,10 @@ public static void closeCluster() {
@ParameterizedTest
@ValueSource(strings = {
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
@@ -108,9 +109,10 @@ public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwar
@ParameterizedTest
@ValueSource(strings = {
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
- StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
+ StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
+ })
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6b67af943fcb3..ca3936633fc19 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -982,10 +982,11 @@ private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
streamOne,
batch1,
TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
mockTime);
final KStream s1 = builder.stream(streamOne);
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
index 08a163de59ef5..e6b212e7d1144 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java
@@ -36,13 +36,13 @@
*/
@Suite
@SelectClasses({
- CompositeReadOnlyKeyValueStoreTest.class,
- CompositeReadOnlyWindowStoreTest.class,
- CompositeReadOnlySessionStoreTest.class,
- GlobalStateStoreProviderTest.class,
- StreamThreadStateStoreProviderTest.class,
- WrappingStoreProviderTest.class,
- QueryableStateIntegrationTest.class,
- })
+ CompositeReadOnlyKeyValueStoreTest.class,
+ CompositeReadOnlyWindowStoreTest.class,
+ CompositeReadOnlySessionStoreTest.class,
+ GlobalStateStoreProviderTest.class,
+ StreamThreadStateStoreProviderTest.class,
+ WrappingStoreProviderTest.class,
+ QueryableStateIntegrationTest.class,
+})
public class StoreQuerySuite {
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
index ab23663bbc732..3e2c97f6e24a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java
@@ -26,7 +26,7 @@
public final class StreamStreamJoinUtil {
- private StreamStreamJoinUtil(){
+ private StreamStreamJoinUtil() {
}
public static boolean skipRecord(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9f65a415d9516..3dc46831b1eb6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -24,7 +24,6 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
-import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
index 172321fab11e8..06a994a5aa8d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java
@@ -31,13 +31,13 @@
*/
@Suite
@SelectClasses({
- StreamTaskTest.class,
- StandbyTaskTest.class,
- GlobalStateTaskTest.class,
- TaskManagerTest.class,
- TaskMetricsTest.class,
- LegacyStickyTaskAssignorTest.class,
- StreamsPartitionAssignorTest.class,
+ StreamTaskTest.class,
+ StandbyTaskTest.class,
+ GlobalStateTaskTest.class,
+ TaskManagerTest.class,
+ TaskMetricsTest.class,
+ LegacyStickyTaskAssignorTest.class,
+ StreamsPartitionAssignorTest.class,
})
public class TaskSuite {
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 2c98c3427d0d3..f5dd717f5b568 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -235,7 +235,7 @@ public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() {
}
public static class RocksDBConfigSetterWithUserProvidedStatistics implements RocksDBConfigSetter {
- public RocksDBConfigSetterWithUserProvidedStatistics(){}
+ public RocksDBConfigSetterWithUserProvidedStatistics() {}
public void setConfig(final String storeName, final Options options, final Map configs) {
lastStatistics = new Statistics();
@@ -306,7 +306,7 @@ public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception
public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
- public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
+ public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig() {}
public void setConfig(final String storeName, final Options options, final Map configs) {
options.setTableFormatConfig(new BlockBasedTableConfig());
@@ -335,7 +335,7 @@ public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() {
}
public static class RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig implements RocksDBConfigSetter {
- public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig(){}
+ public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig() {}
public void setConfig(final String storeName, final Options options, final Map configs) {
options.setTableFormatConfig(new PlainTableConfig());
diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 24ac2f2306dbe..9bb8c6fe45a89 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -121,6 +121,7 @@ public static class RecordingProcessorWrapper implements ProcessorWrapper {
private Set wrappedProcessorNames;
+ @SuppressWarnings("unchecked")
@Override
public void configure(final Map configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index bc20aa1427399..af7a39a92be50 100644
--- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -116,7 +116,7 @@ public void testClusterTemplate() {
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
}, tags = {
- "default.display.key1", "default.display.key2"
+ "default.display.key1", "default.display.key2"
}),
@ClusterTest(types = {Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@@ -126,7 +126,7 @@ public void testClusterTemplate() {
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
}, tags = {
- "default.display.key1", "default.display.key2"
+ "default.display.key1", "default.display.key2"
})
})
public void testClusterTests() throws ExecutionException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index a2ec83be81eb4..1f1914df26823 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -148,7 +148,7 @@ public static Set duplicates(List s) {
* @param Element type.
*/
@SuppressWarnings("unchecked")
- public static Set minus(Set set, T...toRemove) {
+ public static Set minus(Set set, T... toRemove) {
Set res = new HashSet<>(set);
for (T t : toRemove)
res.remove(t);
diff --git a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index f83024837ba57..74919740170d0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -45,7 +45,7 @@
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(serverProperties = {
- @ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
})
public class BrokerApiVersionsCommandTest {
@ClusterTest
diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index fa3803a6eaff2..484f09ec5ef00 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -63,10 +63,10 @@ public void testDescribeQuorumReplicationSuccessful(ClusterInstance cluster) thr
assertTrue(header.matches("NodeId\\s+DirectoryId\\s+LogEndOffset\\s+Lag\\s+LastFetchTimestamp\\s+LastCaughtUpTimestamp\\s+Status\\s+"));
- if (cluster.type() == Type.CO_KRAFT)
- assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size());
- else
- assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size());
+ if (cluster.type() == Type.CO_KRAFT)
+ assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size());
+ else
+ assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size());
Pattern leaderPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Leader\\s*");
assertTrue(leaderPattern.matcher(data.get(0)).find());
diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 389b1584c99f3..ae174d63c6ae5 100644
--- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -101,16 +101,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
- // shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling
- @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"),
- // Don't move partition leaders automatically.
- @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
- @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"),
- @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
- @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
- @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
- @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
- @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
+ // shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling
+ @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"),
+ // Don't move partition leaders automatically.
+ @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"),
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
+ @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
+ @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
})
@ExtendWith(ClusterTestExtensions.class)
public class ReassignPartitionsCommandTest {
@@ -133,7 +133,7 @@ public void testReassignment() throws Exception {
}
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0)
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0)
})
public void testReassignmentWithAlterPartitionDisabled() throws Exception {
// Test reassignment when the IBP is on an older version which does not use
@@ -145,11 +145,11 @@ public void testReassignmentWithAlterPartitionDisabled() throws Exception {
}
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
- @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
- })
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
+ @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+ })
})
public void testReassignmentCompletionDuringPartialUpgrade() throws Exception {
// Test reassignment during a partial upgrade when some brokers are relying on
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
index b6d2801bdff42..eb3de14722833 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java
@@ -41,8 +41,8 @@ public final class Kibosh {
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
- })
+ @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
+ })
public abstract static class KiboshFaultSpec {
@Override
public final boolean equals(Object o) {
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 6bb9e7f4f1f26..1f7fa30e6c4b9 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -30,11 +30,11 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "state")
@JsonSubTypes({
- @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
- @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
- @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
- @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
- })
+ @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
+ @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
+ @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
+ @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
+})
public abstract class TaskState extends Message {
private final TaskSpec spec;
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
index 39a2f8b56b623..821c3a486c49d 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
@@ -28,7 +28,7 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"),
+ @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"),
})
public interface RecordProcessor {
void processRecords(ConsumerRecords consumerRecords);