Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
public FenceProducersResult fenceProducers(Collection<String> transactionalIds, FenceProducersOptions options) {
AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> future =
FenceProducersHandler.newFuture(transactionalIds);
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
invokeDriver(handler, future, options.timeoutMs);
return new FenceProducersResult(future.all());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
Expand All @@ -38,12 +39,16 @@
public class FenceProducersHandler extends AdminApiHandler.Unbatched<CoordinatorKey, ProducerIdAndEpoch> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
private final int txnTimeoutMs;

public FenceProducersHandler(
LogContext logContext
FenceProducersOptions options,
LogContext logContext,
int requestTimeoutMs
) {
this.log = logContext.logger(FenceProducersHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext);
this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs;
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
Expand Down Expand Up @@ -82,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes
.setTransactionTimeoutMs(1);
// This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log.
.setTransactionTimeoutMs(txnTimeoutMs);
return new InitProducerIdRequest.Builder(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.InitProducerIdResponseData;
Expand All @@ -39,19 +40,29 @@
public class FenceProducersHandlerTest {
private final LogContext logContext = new LogContext();
private final Node node = new Node(1, "host", 1234);
private final int requestTimeoutMs = 30000;
private final FenceProducersOptions options = new FenceProducersOptions();

@Test
public void testBuildRequest() {
FenceProducersHandler handler = new FenceProducersHandler(logContext);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId));
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs));
}

@Test
public void testBuildRequestOptionsTimeout() {
final int optionsTimeoutMs = 50000;
options.timeoutMs(optionsTimeoutMs);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs));
}

@Test
public void testHandleSuccessfulResponse() {
String transactionalId = "foo";
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);

FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);

short epoch = 57;
long producerId = 7;
Expand All @@ -73,7 +84,7 @@ public void testHandleSuccessfulResponse() {
@Test
public void testHandleErrorResponse() {
String transactionalId = "foo";
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
Expand Down Expand Up @@ -136,10 +147,10 @@ private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleResponseError(
return result;
}

private void assertLookup(FenceProducersHandler handler, String transactionalId) {
private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) {
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key);
assertEquals(transactionalId, request.data.transactionalId());
assertEquals(1, request.data.transactionTimeoutMs());
assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -105,12 +106,12 @@ public KafkaPrincipal deserialize(byte[] bytes) throws SerializationException {
UpdateMetadataBroker broker = new UpdateMetadataBroker()
.setId(0)
.setRack("rack")
.setEndpoints(Arrays.asList(
new UpdateMetadataRequestData.UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value())
.setEndpoints(Collections.singletonList(
new UpdateMetadataRequestData.UpdateMetadataEndpoint()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please revert those unrelated changes to reduce the size of PR?

.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value())
));

@Test
Expand Down Expand Up @@ -168,9 +169,9 @@ void testDescribeTopicPartitionsRequest() {
.setPartitionId(1)
.setReplicas(Arrays.asList(0, 1, 2))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(2))
.setLeaderEpoch(0)
.setPartitionEpoch(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
Expand All @@ -179,9 +180,9 @@ void testDescribeTopicPartitionsRequest() {
.setPartitionId(0)
.setReplicas(Arrays.asList(0, 1, 2))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(2))
.setLeaderEpoch(0)
.setPartitionEpoch(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
Expand All @@ -190,9 +191,9 @@ void testDescribeTopicPartitionsRequest() {
.setPartitionId(0)
.setReplicas(Arrays.asList(0, 1, 3))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(3))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(3))
.setLeaderEpoch(0)
.setPartitionEpoch(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
Expand Down Expand Up @@ -371,9 +372,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() {
.setPartitionId(0)
.setReplicas(Arrays.asList(0, 1, 2))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(2))
.setLeaderEpoch(0)
.setPartitionEpoch(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
Expand All @@ -382,9 +383,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() {
.setPartitionId(1)
.setReplicas(Arrays.asList(0, 1, 2))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(2))
.setLeaderEpoch(0)
.setPartitionEpoch(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
Expand All @@ -393,9 +394,9 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() {
.setPartitionId(0)
.setReplicas(Arrays.asList(0, 1, 3))
.setLeader(0)
.setIsr(Arrays.asList(0))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(3))
.setIsr(Collections.singletonList(0))
.setEligibleLeaderReplicas(Collections.singletonList(1))
.setLastKnownElr(Collections.singletonList(3))
.setLeaderEpoch(0)
.setPartitionEpoch(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -40,18 +40,18 @@ public class RuntimeLoggerManagerTest {

@Test
public void testValidateSetLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")));
MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")));
}

@Test
public void testValidateDeleteLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.DELETE.id()).
setValue("")));
MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.DELETE.id()).
setValue("")));
}

@ParameterizedTest
Expand All @@ -60,26 +60,26 @@ public void testOperationNotAllowed(byte id) {
OpType opType = AlterConfigOp.OpType.forId(id);
assertEquals(opType + " operation is not allowed for the BROKER_LOGGER resource",
Assertions.assertThrows(InvalidRequestException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(id).
setValue("TRACE")))).getMessage());
() -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(id).
setValue("TRACE")))).getMessage());
}

@Test
public void testValidateBogusLogLevelNameNotAllowed() {
assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " +
"a supported log level. Valid log levels are DEBUG, ERROR, FATAL, INFO, TRACE, WARN",
Assertions.assertThrows(InvalidConfigurationException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("BOGUS")))).getMessage());
() -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("BOGUS")))).getMessage());
}

@Test
public void testValidateSetRootLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(Log4jController.ROOT_LOGGER()).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")));
Expand All @@ -90,9 +90,9 @@ public void testValidateRemoveRootLogLevelConfigNotAllowed() {
assertEquals("Removing the log level of the " + Log4jController.ROOT_LOGGER() +
" logger is not allowed",
Assertions.assertThrows(InvalidRequestException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(Log4jController.ROOT_LOGGER()).
setConfigOperation(OpType.DELETE.id()).
setValue("")))).getMessage());
() -> MANAGER.validateLogLevelConfigs(Collections.singletonList(new AlterableConfig().
setName(Log4jController.ROOT_LOGGER()).
setConfigOperation(OpType.DELETE.id()).
setValue("")))).getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -220,15 +219,15 @@ public void testIncrementalAlterConfigs(boolean usingBootstrapControllers) throw
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
ConfigResource defaultResource = new ConfigResource(BROKER, "");
Map<ConfigResource, Collection<AlterConfigOp>> alterations = new HashMap<>();
alterations.put(nodeResource, Arrays.asList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"),
AlterConfigOp.OpType.SET)));
alterations.put(defaultResource, Arrays.asList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"),
AlterConfigOp.OpType.SET)));
alterations.put(nodeResource, Collections.singletonList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"),
AlterConfigOp.OpType.SET)));
alterations.put(defaultResource, Collections.singletonList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"),
AlterConfigOp.OpType.SET)));
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
Config config = admin.describeConfigs(Arrays.asList(nodeResource)).
Config config = admin.describeConfigs(Collections.singletonList(nodeResource)).
all().get(1, TimeUnit.MINUTES).get(nodeResource);
ConfigEntry entry = config.entries().stream().
filter(e -> e.name().equals("my.custom.config")).
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ public FaultHandler build(String name, boolean fatal, Runnable action) {
}

public static class Builder {
private TestKitNodes nodes;
private Map<String, String> configProps = new HashMap<>();
private SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
private final TestKitNodes nodes;
private final Map<String, String> configProps = new HashMap<>();
private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();

public Builder(TestKitNodes nodes) {
this.nodes = nodes;
Expand Down Expand Up @@ -481,7 +481,7 @@ public String quorumVotersConfig() throws ExecutionException, InterruptedExcepti
}

public class ClientPropertiesBuilder {
private Properties properties;
private final Properties properties;
private boolean usingBootstrapControllers = false;

public ClientPropertiesBuilder() {
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/java/kafka/examples/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private Utils() {
}

public static void printHelp(String message, Object... args) {
System.out.println(format(message, args));
System.out.printf(message + "%n", args);
}

public static void printOut(String message, Object... args) {
Expand Down
Loading