Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4394,7 +4394,10 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
public FenceProducersResult fenceProducers(Collection<String> transactionalIds, FenceProducersOptions options) {
AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> future =
FenceProducersHandler.newFuture(transactionalIds);
FenceProducersHandler handler = new FenceProducersHandler(logContext);
if (options.timeoutMs() == null) {
options.timeoutMs(defaultApiTimeoutMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best if we don't mutate the options if the user passed it in. If someone were to log the options somewhere else, they would see our injected default instead of the fact that no value was provided.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the good comment.

The main reason i mutated the options was to pass the same timeout value in both the FenceProducersHandler and in the invokeDriver.

I just noticed that this is already true as the calcDeadlineMs that is used by the invokeDriver is already returning the defaultApiTimeoutMs in the case of null timeout.

}
FenceProducersHandler handler = new FenceProducersHandler(options, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new FenceProducersResult(future.all());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
Expand All @@ -38,12 +39,15 @@
public class FenceProducersHandler extends AdminApiHandler.Unbatched<CoordinatorKey, ProducerIdAndEpoch> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
private final FenceProducersOptions options;

public FenceProducersHandler(
FenceProducersOptions options,
LogContext logContext
) {
this.log = logContext.logger(FenceProducersHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext);
this.options = options;
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
Expand Down Expand Up @@ -82,9 +86,10 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes
.setTransactionTimeoutMs(1);
// Set transaction timeout to the equivalent as the fenceProducers request since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes. This has been changed to match the fenceProducers request timeout from one as some brokers may be slower than expected
// and we need a safe timeout that allows the transaction init to finish.
.setTransactionTimeoutMs(this.options.timeoutMs());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeoutMs is nullable, and is always null when using the default FenceProducersOptions.
When it is null we should use the KafkaAdminClient#defaultApiTimeoutMs, similar to deleteRecords or calcDeadlineMs.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please recheck @gharris1727 ? 🙏

return new InitProducerIdRequest.Builder(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7031,6 +7031,7 @@ public void testFenceProducers() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
String transactionalId = "copyCat";
Node transactionCoordinator = env.cluster().nodes().iterator().next();
final FenceProducersOptions options = new FenceProducersOptions().timeoutMs(10000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test the null-default case which is going to be more common, and doesn't seem necessary for the correctness of the test either.

I think instead, we should change the request -> request instanceof InitProducerIdRequest to assert that the increased timeout is used, instead of the 1ms timeout. And maybe refactor/parameterize the test to try a default-options and options with an explicit timeout, if we want to test both branches.


// fail to find the coordinator at first with a retriable error
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, transactionalId, transactionCoordinator));
Expand Down Expand Up @@ -7060,7 +7061,7 @@ public void testFenceProducers() throws Exception {
transactionCoordinator
);

FenceProducersResult result = env.adminClient().fenceProducers(Collections.singleton(transactionalId));
FenceProducersResult result = env.adminClient().fenceProducers(Collections.singleton(transactionalId), options);
assertNull(result.all().get());
assertEquals(4761, result.producerId(transactionalId).get());
assertEquals((short) 489, result.epochId(transactionalId).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.kafka.clients.admin.FenceProducersOptions;

public class FenceProducersHandlerTest {
private final LogContext logContext = new LogContext();
private final Node node = new Node(1, "host", 1234);
private final FenceProducersOptions options = new FenceProducersOptions().timeoutMs(10000);

@Test
public void testBuildRequest() {
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId));
}

Expand All @@ -51,7 +53,7 @@ public void testHandleSuccessfulResponse() {
String transactionalId = "foo";
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);

FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext);

short epoch = 57;
long producerId = 7;
Expand All @@ -73,7 +75,7 @@ public void testHandleSuccessfulResponse() {
@Test
public void testHandleErrorResponse() {
String transactionalId = "foo";
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext);
assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
Expand Down Expand Up @@ -142,6 +144,6 @@ private void assertLookup(FenceProducersHandler handler, String transactionalId)
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key);
assertEquals(transactionalId, request.data.transactionalId());
assertEquals(1, request.data.transactionTimeoutMs());
assertEquals(this.options.timeoutMs(), request.data.transactionTimeoutMs());
}
}