Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
public FenceProducersResult fenceProducers(Collection<String> transactionalIds, FenceProducersOptions options) {
AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> future =
FenceProducersHandler.newFuture(transactionalIds);
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
invokeDriver(handler, future, options.timeoutMs);
return new FenceProducersResult(future.all());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
Expand All @@ -38,12 +39,16 @@
public class FenceProducersHandler extends AdminApiHandler.Unbatched<CoordinatorKey, ProducerIdAndEpoch> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
private final int txnTimeoutMs;

public FenceProducersHandler(
LogContext logContext
FenceProducersOptions options,
LogContext logContext,
int requestTimeoutMs
) {
this.log = logContext.logger(FenceProducersHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext);
this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs;
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
Expand Down Expand Up @@ -82,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes
.setTransactionTimeoutMs(1);
// This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log.
.setTransactionTimeoutMs(txnTimeoutMs);
return new InitProducerIdRequest.Builder(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.InitProducerIdResponseData;
Expand All @@ -39,19 +40,29 @@
public class FenceProducersHandlerTest {
private final LogContext logContext = new LogContext();
private final Node node = new Node(1, "host", 1234);
private final int requestTimeoutMs = 30000;
private final FenceProducersOptions options = new FenceProducersOptions();

@Test
public void testBuildRequest() {
FenceProducersHandler handler = new FenceProducersHandler(logContext);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId));
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs));
}

@Test
public void testBuildRequestOptionsTimeout() {
final int optionsTimeoutMs = 50000;
options.timeoutMs(optionsTimeoutMs);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs));
}

@Test
public void testHandleSuccessfulResponse() {
String transactionalId = "foo";
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);

FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);

short epoch = 57;
long producerId = 7;
Expand All @@ -73,7 +84,7 @@ public void testHandleSuccessfulResponse() {
@Test
public void testHandleErrorResponse() {
String transactionalId = "foo";
FenceProducersHandler handler = new FenceProducersHandler(logContext);
FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
Expand Down Expand Up @@ -136,10 +147,10 @@ private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleResponseError(
return result;
}

private void assertLookup(FenceProducersHandler handler, String transactionalId) {
private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) {
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key);
assertEquals(transactionalId, request.data.transactionalId());
assertEquals(1, request.data.transactionTimeoutMs());
assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs());
}
}