From 995652cdcd7ec902d73187164fffbd2804076684 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 3 Dec 2024 06:30:04 +0800 Subject: [PATCH] tmp --- checkstyle/checkstyle.xml | 4 ++- .../kafka/clients/admin/KafkaAdminClient.java | 12 +++---- .../internals/OffsetsForLeaderEpochUtils.java | 2 +- .../kafka/server/authorizer/Authorizer.java | 12 +++---- .../clients/admin/KafkaAdminClientTest.java | 12 +++---- .../ListConsumerGroupOffsetsHandlerTest.java | 20 ++++++------ .../internals/ConsumerProtocolTest.java | 4 +-- .../internals/TransactionManagerTest.java | 8 ++--- .../kafka/common/network/SelectorTest.java | 4 +-- .../requests/OffsetFetchResponseTest.java | 20 ++++++------ .../common/requests/RequestResponseTest.java | 6 ++-- .../mirror/MirrorSourceConnectorTest.java | 4 +-- .../connect/runtime/AbstractHerderTest.java | 4 +-- .../transforms/predicates/HasHeaderKey.java | 5 ++- .../kafka/admin/AdminFenceProducersTest.java | 10 +++--- .../java/kafka/admin/ClientTelemetryTest.java | 2 +- .../java/kafka/admin/ConfigCommandTest.java | 4 +-- .../UserScramCredentialsCommandTest.java | 4 +-- .../BootstrapControllersIntegrationTest.java | 8 ++--- gradle/dependencies.gradle | 2 +- .../dependencycheck-suppressions.xml | 11 ------- .../ConfigurationControlManagerTest.java | 8 ++--- .../controller/QuorumControllerTest.java | 8 ++--- .../ReplicationControlManagerTest.java | 32 ++++++++----------- .../kafka/server/AssignmentsManagerTest.java | 10 +++--- .../storage/internals/log/LogSegment.java | 4 +-- .../internals/log/LogValidatorTest.java | 26 +++++++-------- .../examples/pageview/PageViewTypedDemo.java | 12 +++---- ...ailabilityTaskAssignorIntegrationTest.java | 14 ++++---- .../QueryableStateIntegrationTest.java | 9 +++--- .../streams/integration/StoreQuerySuite.java | 16 +++++----- .../internals/StreamStreamJoinUtil.java | 2 +- .../internals/InternalTopologyBuilder.java | 1 - .../processor/internals/TaskSuite.java | 14 ++++---- .../state/internals/RocksDBStoreTest.java | 6 ++-- .../apache/kafka/streams/utils/TestUtils.java | 1 + .../test/api/ClusterTestExtensionsTest.java | 4 +-- .../org/apache/kafka/tools/ToolsUtils.java | 2 +- .../tools/BrokerApiVersionsCommandTest.java | 2 +- .../tools/MetadataQuorumCommandTest.java | 8 ++--- .../ReassignPartitionsCommandTest.java | 32 +++++++++---------- .../apache/kafka/trogdor/fault/Kibosh.java | 4 +-- .../apache/kafka/trogdor/rest/TaskState.java | 10 +++--- .../trogdor/workload/RecordProcessor.java | 2 +- 44 files changed, 186 insertions(+), 199 deletions(-) diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 6b8aedb6e4700..fa1d5873a2cbc 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -39,7 +39,9 @@ - + + + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e6986c8a378fb..bd135acc74b30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4822,12 +4822,12 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) { setHost(endpoint.host()). setPort(endpoint.port()))); return new AddRaftVoterRequest.Builder( - new AddRaftVoterRequestData(). - setClusterId(options.clusterId().orElse(null)). - setTimeoutMs(timeoutMs). - setVoterId(voterId) . - setVoterDirectoryId(voterDirectoryId). - setListeners(listeners)); + new AddRaftVoterRequestData(). + setClusterId(options.clusterId().orElse(null)). + setTimeoutMs(timeoutMs). + setVoterId(voterId) . + setVoterDirectoryId(voterDirectoryId). + setListeners(listeners)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java index 033daa098761d..ae4b9f71f6c4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochUtils.java @@ -44,7 +44,7 @@ public final class OffsetsForLeaderEpochUtils { private static final Logger LOG = LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class); - private OffsetsForLeaderEpochUtils(){} + private OffsetsForLeaderEpochUtils() {} static AbstractRequest.Builder prepareRequest( Map requestData) { diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index ca2a76b4716a9..58241c51ea085 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -202,14 +202,14 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), EnumMap> denyPatterns = new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; EnumMap> allowPatterns = new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; boolean hasWildCardAllow = false; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index cf5fbc938e3ee..ca70af6d114f7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2061,9 +2061,9 @@ public void testDescribeBrokerAndLogConfigs() throws Exception { new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult() .setResourceName(brokerResource.name()).setResourceType(brokerResource.type().id()).setErrorCode(Errors.NONE.code()) .setConfigs(emptyList()), - new DescribeConfigsResponseData.DescribeConfigsResult() - .setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code()) - .setConfigs(emptyList())))), env.cluster().nodeById(0)); + new DescribeConfigsResponseData.DescribeConfigsResult() + .setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code()) + .setConfigs(emptyList())))), env.cluster().nodeById(0)); Map> result = env.adminClient().describeConfigs(asList( brokerResource, brokerLoggerResource)).values(); @@ -2102,9 +2102,9 @@ public void testDescribeConfigsUnrequested() throws Exception { new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult() .setResourceName(topic.name()).setResourceType(topic.type().id()).setErrorCode(Errors.NONE.code()) .setConfigs(emptyList()), - new DescribeConfigsResponseData.DescribeConfigsResult() - .setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code()) - .setConfigs(emptyList()))))); + new DescribeConfigsResponseData.DescribeConfigsResult() + .setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code()) + .setConfigs(emptyList()))))); Map> result = env.adminClient().describeConfigs(singletonList( topic)).values(); assertEquals(new HashSet<>(singletonList(topic)), result.keySet()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index d9935fbdca6e8..e3bb56347a8ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -177,11 +177,11 @@ public void testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups( Map offsetAndMetadataMapTwo = Collections.singletonMap(t2p2, new OffsetAndMetadata(10L)); Map> expectedResult = - new HashMap>() {{ - put(groupZero, offsetAndMetadataMapZero); - put(groupOne, offsetAndMetadataMapOne); - put(groupTwo, offsetAndMetadataMapTwo); - }}; + new HashMap<>() {{ + put(groupZero, offsetAndMetadataMapZero); + put(groupOne, offsetAndMetadataMapOne); + put(groupTwo, offsetAndMetadataMapTwo); + }}; assertCompletedForMultipleGroups( handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult); @@ -304,11 +304,11 @@ private OffsetFetchResponse buildResponseWithPartitionErrorWithMultipleGroups(Er responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); Map> responseData = - new HashMap>() {{ - put(groupZero, responseDataZero); - put(groupOne, responseDataOne); - put(groupTwo, responseDataTwo); - }}; + new HashMap<>() {{ + put(groupZero, responseDataZero); + put(groupOne, responseDataOne); + put(groupTwo, responseDataTwo); + }}; Map errorMap = errorMap(groups, Errors.NONE); return new OffsetFetchResponse(0, errorMap, responseData); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 07808f29806a5..b73576757e86e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -298,8 +298,8 @@ public void serializeDeserializeConsumerProtocolSubscriptionAllVersions() { if (version >= 1) { assertEquals( Set.of( - new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)), - new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0) + new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)), + new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0) )), Set.copyOf(parsedSubscription.ownedPartitions()) ); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 9ae80dc19baf3..6143daaa96717 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -1815,10 +1815,10 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept @ParameterizedTest @EnumSource(names = { - "UNKNOWN_TOPIC_OR_PARTITION", - "REQUEST_TIMED_OUT", - "COORDINATOR_LOAD_IN_PROGRESS", - "CONCURRENT_TRANSACTIONS" + "UNKNOWN_TOPIC_OR_PARTITION", + "REQUEST_TIMED_OUT", + "COORDINATOR_LOAD_IN_PROGRESS", + "CONCURRENT_TRANSACTIONS" }) public void testRetriableErrors(Errors error) { // Ensure FindCoordinator retries. diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index b4c73d64d38c4..d955c7939ae8f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -907,8 +907,8 @@ public void testLowestPriorityChannel() throws Exception { } assertNotNull(selector.lowestPriorityChannel()); for (int i = conns - 1; i >= 0; i--) { - if (i != 2) - assertEquals("", blockingRequest(String.valueOf(i), "")); + if (i != 2) + assertEquals("", blockingRequest(String.valueOf(i), "")); time.sleep(10); } assertEquals("2", selector.lowestPriorityChannel().id()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java index c85d26dac5756..d0ef79b4479e8 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java @@ -390,16 +390,16 @@ public void testUseDefaultLeaderEpochV0ToV7() { .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(throttleTimeMs) .setTopics(Collections.singletonList( - new OffsetFetchResponseTopic() - .setName(topicOne) - .setPartitions(Collections.singletonList( - new OffsetFetchResponsePartition() - .setPartitionIndex(partitionOne) - .setCommittedOffset(offset) - .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - .setMetadata(metadata)) - )) + new OffsetFetchResponseTopic() + .setName(topicOne) + .setPartitions(Collections.singletonList( + new OffsetFetchResponsePartition() + .setPartitionIndex(partitionOne) + .setCommittedOffset(offset) + .setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setMetadata(metadata)) + )) ); assertEquals(expectedData, response.data()); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cb6a2458261ea..0d32fd442af88 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2187,9 +2187,9 @@ private JoinGroupRequest createJoinGroupRequest(short version) { JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection( Collections.singleton( - new JoinGroupRequestData.JoinGroupRequestProtocol() - .setName("consumer-range") - .setMetadata(new byte[0])).iterator() + new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("consumer-range") + .setMetadata(new byte[0])).iterator() ); JoinGroupRequestData data = new JoinGroupRequestData() diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index a410adde94458..588baf8c09047 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -167,10 +167,10 @@ public void testAclFiltering() { new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter()); assertFalse(connector.shouldReplicateAcl( new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), - new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE"); + new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE"); assertTrue(connector.shouldReplicateAcl( new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL), - new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL"); + new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL"); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 01f847d94f667..aa715667d24c4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -1196,7 +1196,7 @@ protected void addConfigKey(Map keys, String name, keys.putAll(configDef.configKeys()); } - protected void addValue(List values, String name, String value, String...errors) { + protected void addValue(List values, String name, String value, String... errors) { values.add(new ConfigValue(name, value, new ArrayList<>(), Arrays.asList(errors))); } @@ -1211,7 +1211,7 @@ protected void assertNoInfoKey(ConfigInfos infos, String name) { assertNull(info.configKey()); } - protected void assertInfoValue(ConfigInfos infos, String name, String value, String...errors) { + protected void assertInfoValue(ConfigInfos infos, String name, String value, String... errors) { ConfigValueInfo info = findInfo(infos, name).configValue(); assertEquals(name, info.name()); assertEquals(value, info.value()); diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java index fe0a99c4abe72..566bdfab23832 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java @@ -35,9 +35,8 @@ public class HasHeaderKey> implements Predicate, V private static final String NAME_CONFIG = "name"; public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "The header name."); + .define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "The header name."); private String name; @Override diff --git a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java index 4b8178032e06f..d4b87ef8ed572 100644 --- a/core/src/test/java/kafka/admin/AdminFenceProducersTest.java +++ b/core/src/test/java/kafka/admin/AdminFenceProducersTest.java @@ -50,11 +50,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ClusterTestDefaults(serverProperties = { - @ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), - @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), - @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), - @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"), - @ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000") + @ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"), + @ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000") }) @ExtendWith(ClusterTestExtensions.class) public class AdminFenceProducersTest { diff --git a/core/src/test/java/kafka/admin/ClientTelemetryTest.java b/core/src/test/java/kafka/admin/ClientTelemetryTest.java index 56d466bd27b5c..f701d3f38450e 100644 --- a/core/src/test/java/kafka/admin/ClientTelemetryTest.java +++ b/core/src/test/java/kafka/admin/ClientTelemetryTest.java @@ -75,7 +75,7 @@ public class ClientTelemetryTest { types = Type.KRAFT, brokers = 3, serverProperties = { - @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"), + @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"), }) public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { Map configs = new HashMap<>(); diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java b/core/src/test/java/kafka/admin/ConfigCommandTest.java index 70b1faee43b66..10c24111e4757 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java @@ -420,7 +420,7 @@ public void testParseConfigsToBeAddedForAddConfigFile() throws IOException { assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested")); } - public void testExpectedEntityTypeNames(List expectedTypes, List expectedNames, List connectOpts, String...args) { + public void testExpectedEntityTypeNames(List expectedTypes, List expectedNames, List connectOpts, String... args) { ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args))); createOpts.checkArgs(); assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes)); @@ -1434,7 +1434,7 @@ public static List concat(List... lists) { } @SafeVarargs - public static Map concat(Map...maps) { + public static Map concat(Map... maps) { Map res = new HashMap<>(); Stream.of(maps) .map(Map::entrySet) diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java index 55db18893f8f7..697dda07363a7 100644 --- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java +++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java @@ -62,10 +62,10 @@ public ConfigCommandResult(String stdout, OptionalInt exitStatus) { } } - private ConfigCommandResult runConfigCommandViaBroker(String...args) { + private ConfigCommandResult runConfigCommandViaBroker(String... args) { AtomicReference exitStatus = new AtomicReference<>(OptionalInt.empty()); Exit.setExitProcedure((status, __) -> { - exitStatus.set(OptionalInt.of((Integer) status)); + exitStatus.set(OptionalInt.of(status)); throw new RuntimeException(); }); diff --git a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java index 1462e1c7f64a7..75cd070b93d30 100644 --- a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java +++ b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java @@ -311,16 +311,16 @@ private static List> translatePartitionInfoToNodeIdList(List CVE-2023-35116 - - - CVE-2020-8908 - CVE-2023-2976 - deserializer() { @SuppressWarnings("DefaultAnnotationParam") // being explicit for the example @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t") @JsonSubTypes({ - @JsonSubTypes.Type(value = PageView.class, name = "pv"), - @JsonSubTypes.Type(value = UserProfile.class, name = "up"), - @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"), - @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"), - @JsonSubTypes.Type(value = RegionCount.class, name = "rc") - }) + @JsonSubTypes.Type(value = PageView.class, name = "pv"), + @JsonSubTypes.Type(value = UserProfile.class, name = "up"), + @JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"), + @JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"), + @JsonSubTypes.Type(value = RegionCount.class, name = "rc") + }) public interface JSONSerdeCompatible { } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index df4d981fa7175..ce05eabb56606 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -97,9 +97,10 @@ public static void closeCluster() { @ParameterizedTest @ValueSource(strings = { - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY}) + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + }) public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException { // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum // value is one minute @@ -108,9 +109,10 @@ public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwar @ParameterizedTest @ValueSource(strings = { - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, - StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY}) + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + }) public void shouldScaleOutWithWarmupTasksAndPersistentStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException { // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum // value is one minute diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 6b67af943fcb3..ca3936633fc19 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -982,10 +982,11 @@ private void verifyCanQueryState(final int cacheSizeBytes) throws Exception { streamOne, batch1, TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - StringSerializer.class, - StringSerializer.class, - new Properties()), + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties() + ), mockTime); final KStream s1 = builder.stream(streamOne); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java index 08a163de59ef5..e6b212e7d1144 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java @@ -36,13 +36,13 @@ */ @Suite @SelectClasses({ - CompositeReadOnlyKeyValueStoreTest.class, - CompositeReadOnlyWindowStoreTest.class, - CompositeReadOnlySessionStoreTest.class, - GlobalStateStoreProviderTest.class, - StreamThreadStateStoreProviderTest.class, - WrappingStoreProviderTest.class, - QueryableStateIntegrationTest.class, - }) + CompositeReadOnlyKeyValueStoreTest.class, + CompositeReadOnlyWindowStoreTest.class, + CompositeReadOnlySessionStoreTest.class, + GlobalStateStoreProviderTest.class, + StreamThreadStateStoreProviderTest.class, + WrappingStoreProviderTest.class, + QueryableStateIntegrationTest.class, +}) public class StoreQuerySuite { } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java index ab23663bbc732..3e2c97f6e24a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java @@ -26,7 +26,7 @@ public final class StreamStreamJoinUtil { - private StreamStreamJoinUtil(){ + private StreamStreamJoinUtil() { } public static boolean skipRecord( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 9f65a415d9516..3dc46831b1eb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; -import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java index 172321fab11e8..06a994a5aa8d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java @@ -31,13 +31,13 @@ */ @Suite @SelectClasses({ - StreamTaskTest.class, - StandbyTaskTest.class, - GlobalStateTaskTest.class, - TaskManagerTest.class, - TaskMetricsTest.class, - LegacyStickyTaskAssignorTest.class, - StreamsPartitionAssignorTest.class, + StreamTaskTest.class, + StandbyTaskTest.class, + GlobalStateTaskTest.class, + TaskManagerTest.class, + TaskMetricsTest.class, + LegacyStickyTaskAssignorTest.class, + StreamsPartitionAssignorTest.class, }) public class TaskSuite { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 2c98c3427d0d3..f5dd717f5b568 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -235,7 +235,7 @@ public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() { } public static class RocksDBConfigSetterWithUserProvidedStatistics implements RocksDBConfigSetter { - public RocksDBConfigSetterWithUserProvidedStatistics(){} + public RocksDBConfigSetterWithUserProvidedStatistics() {} public void setConfig(final String storeName, final Options options, final Map configs) { lastStatistics = new Statistics(); @@ -306,7 +306,7 @@ public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter { - public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){} + public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig() {} public void setConfig(final String storeName, final Options options, final Map configs) { options.setTableFormatConfig(new BlockBasedTableConfig()); @@ -335,7 +335,7 @@ public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() { } public static class RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig implements RocksDBConfigSetter { - public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig(){} + public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig() {} public void setConfig(final String storeName, final Options options, final Map configs) { options.setTableFormatConfig(new PlainTableConfig()); diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java index 24ac2f2306dbe..9bb8c6fe45a89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -121,6 +121,7 @@ public static class RecordingProcessorWrapper implements ProcessorWrapper { private Set wrappedProcessorNames; + @SuppressWarnings("unchecked") @Override public void configure(final Map configs) { if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) { diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index bc20aa1427399..af7a39a92be50 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -116,7 +116,7 @@ public void testClusterTemplate() { @ClusterConfigProperty(key = "spam", value = "eggs"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value") }, tags = { - "default.display.key1", "default.display.key2" + "default.display.key1", "default.display.key2" }), @ClusterTest(types = {Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @@ -126,7 +126,7 @@ public void testClusterTemplate() { @ClusterConfigProperty(key = "spam", value = "eggs"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value") }, tags = { - "default.display.key1", "default.display.key2" + "default.display.key1", "default.display.key2" }) }) public void testClusterTests() throws ExecutionException, InterruptedException { diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java index a2ec83be81eb4..1f1914df26823 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java @@ -148,7 +148,7 @@ public static Set duplicates(List s) { * @param Element type. */ @SuppressWarnings("unchecked") - public static Set minus(Set set, T...toRemove) { + public static Set minus(Set set, T... toRemove) { Set res = new HashSet<>(set); for (T t : toRemove) res.remove(t); diff --git a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java index f83024837ba57..74919740170d0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java @@ -45,7 +45,7 @@ @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(serverProperties = { - @ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"), }) public class BrokerApiVersionsCommandTest { @ClusterTest diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java index fa3803a6eaff2..484f09ec5ef00 100644 --- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java @@ -63,10 +63,10 @@ public void testDescribeQuorumReplicationSuccessful(ClusterInstance cluster) thr assertTrue(header.matches("NodeId\\s+DirectoryId\\s+LogEndOffset\\s+Lag\\s+LastFetchTimestamp\\s+LastCaughtUpTimestamp\\s+Status\\s+")); - if (cluster.type() == Type.CO_KRAFT) - assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size()); - else - assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size()); + if (cluster.type() == Type.CO_KRAFT) + assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size()); + else + assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size()); Pattern leaderPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Leader\\s*"); assertTrue(leaderPattern.matcher(data.get(0)).find()); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 389b1584c99f3..ae174d63c6ae5 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -101,16 +101,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = { - // shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling - @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"), - // Don't move partition leaders automatically. - @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), - @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"), - @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"), - @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"), - @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"), - @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"), - @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"), + // shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling + @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"), + // Don't move partition leaders automatically. + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"), + @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"), }) @ExtendWith(ClusterTestExtensions.class) public class ReassignPartitionsCommandTest { @@ -133,7 +133,7 @@ public void testReassignment() throws Exception { } @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0) }) public void testReassignmentWithAlterPartitionDisabled() throws Exception { // Test reassignment when the IBP is on an older version which does not use @@ -145,11 +145,11 @@ public void testReassignmentWithAlterPartitionDisabled() throws Exception { } @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { - @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), - @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), - @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), - }) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { + @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + }) }) public void testReassignmentCompletionDuringPartialUpgrade() throws Exception { // Test reassignment during a partial upgrade when some brokers are relying on diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java index b6d2801bdff42..eb3de14722833 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/fault/Kibosh.java @@ -41,8 +41,8 @@ public final class Kibosh { @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"), - }) + @JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"), + }) public abstract static class KiboshFaultSpec { @Override public final boolean equals(Object o) { diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java index 6bb9e7f4f1f26..1f7fa30e6c4b9 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java @@ -30,11 +30,11 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "state") @JsonSubTypes({ - @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE), - @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE), - @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE), - @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE) - }) + @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE), + @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE), + @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE), + @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE) +}) public abstract class TaskState extends Message { private final TaskSpec spec; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java index 39a2f8b56b623..821c3a486c49d 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java @@ -28,7 +28,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"), + @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"), }) public interface RecordProcessor { void processRecords(ConsumerRecords consumerRecords);