diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 000797dba0918..ddd1a03ecd136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -490,7 +490,8 @@ public CompletableFuture> fetchOffsets( return result; } - private OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, + // Visible for testing + OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, final long deadlineMs) { return jitter.isPresent() ? new OffsetFetchRequestState( @@ -865,6 +866,11 @@ private void handleClientResponse(final ClientResponse response, } } + @Override + public String toStringBase() { + return super.toStringBase() + ", " + memberInfo; + } + abstract void onResponse(final ClientResponse response); abstract void removeRequest(); @@ -1078,14 +1084,9 @@ private void chainFuture( } @Override - public String toString() { - return "OffsetFetchRequestState{" + - "requestedPartitions=" + requestedPartitions + - ", memberId=" + memberInfo.memberId.orElse("undefined") + - ", memberEpoch=" + (memberInfo.memberEpoch.isPresent() ? memberInfo.memberEpoch.get() : "undefined") + - ", future=" + future + - ", " + toStringBase() + - '}'; + public String toStringBase() { + return super.toStringBase() + + ", requestedPartitions=" + requestedPartitions; } } @@ -1278,5 +1279,11 @@ static class MemberInfo { this.memberId = Optional.empty(); this.memberEpoch = Optional.empty(); } + + @Override + public String toString() { + return "memberId=" + memberId.orElse("undefined") + + ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined"); + } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 1027874906c21..e86858c34273f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -75,6 +75,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID; import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -122,6 +123,48 @@ public void setup() { this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } + @Test + public void testOffsetFetchRequestStateToStringBase() { + ConsumerConfig config = mock(ConsumerConfig.class); + + CommitRequestManager commitRequestManager = new CommitRequestManager( + time, + logContext, + subscriptionState, + config, + coordinatorRequestManager, + offsetCommitCallbackInvoker, + DEFAULT_GROUP_ID, + Optional.of(DEFAULT_GROUP_INSTANCE_ID), + retryBackoffMs, + retryBackoffMaxMs, + OptionalDouble.of(0), + metrics); + + commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty()); + Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); + + CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0); + + TimedRequestState timedRequestState = new TimedRequestState( + logContext, + CommitRequestManager.class.getSimpleName(), + retryBackoffMs, + 2, + retryBackoffMaxMs, + 0, + TimedRequestState.deadlineTimer(time, 0) + ); + + String target = timedRequestState.toStringBase() + + ", " + offsetFetchRequestState.memberInfo + + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions; + + assertDoesNotThrow(timedRequestState::toString); + assertFalse(target.contains("Optional")); + assertEquals(target, offsetFetchRequestState.toStringBase()); + } + @Test public void testPollSkipIfCoordinatorUnknown() { CommitRequestManager commitRequestManager = create(false, 0);