Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets(
return result;
}

private OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition> partitions,
// Visible for testing
OffsetFetchRequestState createOffsetFetchRequest(final Set<TopicPartition> partitions,
final long deadlineMs) {
return jitter.isPresent() ?
new OffsetFetchRequestState(
Expand Down Expand Up @@ -865,6 +866,11 @@ private void handleClientResponse(final ClientResponse response,
}
}

@Override
public String toStringBase() {
return super.toStringBase() + ", " + memberInfo;
}

abstract void onResponse(final ClientResponse response);

abstract void removeRequest();
Expand Down Expand Up @@ -1078,14 +1084,9 @@ private void chainFuture(
}

@Override
public String toString() {
return "OffsetFetchRequestState{" +
"requestedPartitions=" + requestedPartitions +
", memberId=" + memberInfo.memberId.orElse("undefined") +
", memberEpoch=" + (memberInfo.memberEpoch.isPresent() ? memberInfo.memberEpoch.get() : "undefined") +
", future=" + future +
", " + toStringBase() +
'}';
public String toStringBase() {
return super.toStringBase() +
", requestedPartitions=" + requestedPartitions;
}
}

Expand Down Expand Up @@ -1278,5 +1279,11 @@ static class MemberInfo {
this.memberId = Optional.empty();
this.memberEpoch = Optional.empty();
}

@Override
public String toString() {
return "memberId=" + memberId.orElse("undefined") +
", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -122,6 +123,48 @@ public void setup() {
this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

@Test
public void testOffsetFetchRequestStateToStringBase() {
ConsumerConfig config = mock(ConsumerConfig.class);

CommitRequestManager commitRequestManager = new CommitRequestManager(
time,
logContext,
subscriptionState,
config,
coordinatorRequestManager,
offsetCommitCallbackInvoker,
DEFAULT_GROUP_ID,
Optional.of(DEFAULT_GROUP_INSTANCE_ID),
retryBackoffMs,
retryBackoffMaxMs,
OptionalDouble.of(0),
metrics);

commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty());
Set<TopicPartition> requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1));

CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0);

TimedRequestState timedRequestState = new TimedRequestState(
logContext,
CommitRequestManager.class.getSimpleName(),
retryBackoffMs,
2,
retryBackoffMaxMs,
0,
TimedRequestState.deadlineTimer(time, 0)
);

String target = timedRequestState.toStringBase() +
", " + offsetFetchRequestState.memberInfo +
", requestedPartitions=" + offsetFetchRequestState.requestedPartitions;

assertDoesNotThrow(timedRequestState::toString);
assertFalse(target.contains("Optional"));
assertEquals(target, offsetFetchRequestState.toStringBase());
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add assertFalse(target.contains("Optional")); to make sure we unwrap the optional variables?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!


@Test
public void testPollSkipIfCoordinatorUnknown() {
CommitRequestManager commitRequestManager = create(false, 0);
Expand Down