From 81a55722046b886a51a9e0dc3fe3c597b969b2fa Mon Sep 17 00:00:00 2001 From: brenden20 Date: Tue, 11 Jun 2024 14:58:53 -0500 Subject: [PATCH 1/9] Add toStringBase(), toString(), and a test --- .../internals/CommitRequestManager.java | 20 +++++---- .../internals/CommitRequestManagerTest.java | 45 +++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 000797dba0918..c5788fed465c2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1078,14 +1078,11 @@ private void chainFuture( } @Override - public String toString() { - return "OffsetFetchRequestState{" + - "requestedPartitions=" + requestedPartitions + - ", memberId=" + memberInfo.memberId.orElse("undefined") + - ", memberEpoch=" + (memberInfo.memberEpoch.isPresent() ? memberInfo.memberEpoch.get() : "undefined") + - ", future=" + future + - ", " + toStringBase() + - '}'; + public String toStringBase() { + return super.toStringBase() + + ", memberInfo=" + memberInfo + + ", requestedPartitions=" + requestedPartitions + + ", future=" + future; } } @@ -1278,5 +1275,12 @@ static class MemberInfo { this.memberId = Optional.empty(); this.memberEpoch = Optional.empty(); } + + @Override + public String toString() { + return "MemberInfo{" + "memberId=" + memberId.orElse("undefined") + + ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch : "undefined") + "}"; + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 1027874906c21..64bbed255bce6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -75,6 +75,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID; import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -122,6 +123,50 @@ public void setup() { this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } + @Test + public void testOffsetFetchRequestStateToStringBase() { + ConsumerConfig config = mock(ConsumerConfig.class); + CommitRequestManager.MemberInfo memberInfo = new CommitRequestManager.MemberInfo(); + + CommitRequestManager commitRequestManager = new CommitRequestManager( + time, + logContext, + subscriptionState, + config, + coordinatorRequestManager, + offsetCommitCallbackInvoker, + "groupId", + Optional.of("groupInstanceId"), + metrics); + + Set requestedPartitions = new HashSet<>(); + TopicPartition topicPartition1 = new TopicPartition("topic-1", 1); + requestedPartitions.add(topicPartition1); + + CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.new OffsetFetchRequestState( + requestedPartitions, + retryBackoffMs, + retryBackoffMaxMs, + 1000, + memberInfo); + + TimedRequestState timedRequestState = new TimedRequestState( + logContext, + "CommitRequestManager", + retryBackoffMs, + retryBackoffMaxMs, + TimedRequestState.deadlineTimer(time, 0) + ); + + String target = timedRequestState.toStringBase() + + ", memberInfo=" + memberInfo + + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions + + ", future=" + offsetFetchRequestState.future(); + + assertDoesNotThrow(timedRequestState::toString); + assertEquals(target, offsetFetchRequestState.toStringBase()); + } + @Test public void testPollSkipIfCoordinatorUnknown() { CommitRequestManager commitRequestManager = create(false, 0); From 7147a27d629f42051f1826d935c2cfe612e9c820 Mon Sep 17 00:00:00 2001 From: brenden20 <118419078+brenden20@users.noreply.github.com> Date: Tue, 11 Jun 2024 15:54:12 -0500 Subject: [PATCH 2/9] Cleaner line Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com> --- .../clients/consumer/internals/CommitRequestManagerTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 64bbed255bce6..2efcafaa2681a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -139,9 +139,7 @@ public void testOffsetFetchRequestStateToStringBase() { Optional.of("groupInstanceId"), metrics); - Set requestedPartitions = new HashSet<>(); - TopicPartition topicPartition1 = new TopicPartition("topic-1", 1); - requestedPartitions.add(topicPartition1); + Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.new OffsetFetchRequestState( requestedPartitions, From e91891f69bd5bd42b80672f0338683f87f6bbb8b Mon Sep 17 00:00:00 2001 From: brenden20 Date: Tue, 11 Jun 2024 16:20:21 -0500 Subject: [PATCH 3/9] Add toStringBase() to RetriableRequestState --- .../consumer/internals/CommitRequestManager.java | 10 +++++++--- .../consumer/internals/CommitRequestManagerTest.java | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index c5788fed465c2..102d31ec2a04e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -865,6 +865,11 @@ private void handleClientResponse(final ClientResponse response, } } + @Override + public String toStringBase() { + return super.toStringBase() + ", " + memberInfo; + } + abstract void onResponse(final ClientResponse response); abstract void removeRequest(); @@ -1080,7 +1085,6 @@ private void chainFuture( @Override public String toStringBase() { return super.toStringBase() + - ", memberInfo=" + memberInfo + ", requestedPartitions=" + requestedPartitions + ", future=" + future; } @@ -1278,8 +1282,8 @@ static class MemberInfo { @Override public String toString() { - return "MemberInfo{" + "memberId=" + memberId.orElse("undefined") + - ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch : "undefined") + "}"; + return "memberId=" + memberId.orElse("undefined") + + ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined"); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 2efcafaa2681a..bb0419d910aff 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -157,10 +157,11 @@ public void testOffsetFetchRequestStateToStringBase() { ); String target = timedRequestState.toStringBase() + - ", memberInfo=" + memberInfo + + ", " + memberInfo + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions + ", future=" + offsetFetchRequestState.future(); + System.out.println(offsetFetchRequestState.toStringBase()); assertDoesNotThrow(timedRequestState::toString); assertEquals(target, offsetFetchRequestState.toStringBase()); } From caf17bb52b455f449fba39e0b3d854db23b8648c Mon Sep 17 00:00:00 2001 From: brenden20 Date: Tue, 11 Jun 2024 16:22:40 -0500 Subject: [PATCH 4/9] Remove print statement --- .../clients/consumer/internals/CommitRequestManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index bb0419d910aff..82020d56d6bec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -161,7 +161,6 @@ public void testOffsetFetchRequestStateToStringBase() { ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions + ", future=" + offsetFetchRequestState.future(); - System.out.println(offsetFetchRequestState.toStringBase()); assertDoesNotThrow(timedRequestState::toString); assertEquals(target, offsetFetchRequestState.toStringBase()); } From 1941a55478be470222778e9ea3dd888b8c0a3146 Mon Sep 17 00:00:00 2001 From: brenden20 Date: Tue, 11 Jun 2024 16:43:29 -0500 Subject: [PATCH 5/9] Small update Added setter for memberEpoch, removed future from toStringBase --- .../clients/consumer/internals/CommitRequestManager.java | 7 +++++-- .../consumer/internals/CommitRequestManagerTest.java | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 102d31ec2a04e..9eb7f3a6bcd59 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1085,8 +1085,7 @@ private void chainFuture( @Override public String toStringBase() { return super.toStringBase() + - ", requestedPartitions=" + requestedPartitions + - ", future=" + future; + ", requestedPartitions=" + requestedPartitions; } } @@ -1286,5 +1285,9 @@ public String toString() { ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined"); } + // Visible for testing + protected void setMemberEpoch(int memberEpoch) { + this.memberEpoch = Optional.of(memberEpoch); + } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 82020d56d6bec..6abb0e408712a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -127,6 +127,7 @@ public void setup() { public void testOffsetFetchRequestStateToStringBase() { ConsumerConfig config = mock(ConsumerConfig.class); CommitRequestManager.MemberInfo memberInfo = new CommitRequestManager.MemberInfo(); + memberInfo.setMemberEpoch(1); CommitRequestManager commitRequestManager = new CommitRequestManager( time, @@ -158,8 +159,7 @@ public void testOffsetFetchRequestStateToStringBase() { String target = timedRequestState.toStringBase() + ", " + memberInfo + - ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions + - ", future=" + offsetFetchRequestState.future(); + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions; assertDoesNotThrow(timedRequestState::toString); assertEquals(target, offsetFetchRequestState.toStringBase()); From 57680175ae83d45570dc0127c09770bb6caf3e8e Mon Sep 17 00:00:00 2001 From: brenden20 Date: Tue, 11 Jun 2024 17:02:28 -0500 Subject: [PATCH 6/9] Removed unnecessary setter method --- .../clients/consumer/internals/CommitRequestManager.java | 5 ----- .../clients/consumer/internals/CommitRequestManagerTest.java | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 9eb7f3a6bcd59..837c79eec38e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -1284,10 +1284,5 @@ public String toString() { return "memberId=" + memberId.orElse("undefined") + ", memberEpoch=" + (memberEpoch.isPresent() ? memberEpoch.get() : "undefined"); } - - // Visible for testing - protected void setMemberEpoch(int memberEpoch) { - this.memberEpoch = Optional.of(memberEpoch); - } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 6abb0e408712a..5c4b1fc56db81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -127,7 +127,6 @@ public void setup() { public void testOffsetFetchRequestStateToStringBase() { ConsumerConfig config = mock(ConsumerConfig.class); CommitRequestManager.MemberInfo memberInfo = new CommitRequestManager.MemberInfo(); - memberInfo.setMemberEpoch(1); CommitRequestManager commitRequestManager = new CommitRequestManager( time, @@ -140,6 +139,7 @@ public void testOffsetFetchRequestStateToStringBase() { Optional.of("groupInstanceId"), metrics); + commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty()); Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.new OffsetFetchRequestState( From 837d9ea725098114c66a58f0e82b63af2a54e607 Mon Sep 17 00:00:00 2001 From: brenden20 Date: Wed, 12 Jun 2024 09:03:35 -0500 Subject: [PATCH 7/9] Add assertFalse statement --- .../clients/consumer/internals/CommitRequestManagerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 5c4b1fc56db81..9ff7646784fba 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -162,6 +162,7 @@ public void testOffsetFetchRequestStateToStringBase() { ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions; assertDoesNotThrow(timedRequestState::toString); + assertFalse(target.contains("Optional")); assertEquals(target, offsetFetchRequestState.toStringBase()); } From 98361ea5b0dbdec62f689fb1ec59a9da1620cefd Mon Sep 17 00:00:00 2001 From: brenden20 Date: Wed, 12 Jun 2024 09:42:01 -0500 Subject: [PATCH 8/9] Fixed toStringBase() test --- .../internals/CommitRequestManager.java | 3 ++- .../internals/CommitRequestManagerTest.java | 21 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 837c79eec38e7..c649d382c2e2a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -490,7 +490,8 @@ public CompletableFuture> fetchOffsets( return result; } - private OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, + // Visible for testing + protected OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, final long deadlineMs) { return jitter.isPresent() ? new OffsetFetchRequestState( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 9ff7646784fba..e86858c34273f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -126,7 +126,6 @@ public void setup() { @Test public void testOffsetFetchRequestStateToStringBase() { ConsumerConfig config = mock(ConsumerConfig.class); - CommitRequestManager.MemberInfo memberInfo = new CommitRequestManager.MemberInfo(); CommitRequestManager commitRequestManager = new CommitRequestManager( time, @@ -135,30 +134,30 @@ public void testOffsetFetchRequestStateToStringBase() { config, coordinatorRequestManager, offsetCommitCallbackInvoker, - "groupId", - Optional.of("groupInstanceId"), + DEFAULT_GROUP_ID, + Optional.of(DEFAULT_GROUP_INSTANCE_ID), + retryBackoffMs, + retryBackoffMaxMs, + OptionalDouble.of(0), metrics); commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty()); Set requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1)); - CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.new OffsetFetchRequestState( - requestedPartitions, - retryBackoffMs, - retryBackoffMaxMs, - 1000, - memberInfo); + CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0); TimedRequestState timedRequestState = new TimedRequestState( logContext, - "CommitRequestManager", + CommitRequestManager.class.getSimpleName(), retryBackoffMs, + 2, retryBackoffMaxMs, + 0, TimedRequestState.deadlineTimer(time, 0) ); String target = timedRequestState.toStringBase() + - ", " + memberInfo + + ", " + offsetFetchRequestState.memberInfo + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions; assertDoesNotThrow(timedRequestState::toString); From 2583e120df71cfbb4b6026dccb5850b0b944dfe6 Mon Sep 17 00:00:00 2001 From: brenden20 Date: Wed, 12 Jun 2024 12:58:35 -0500 Subject: [PATCH 9/9] Changed method visibility --- .../kafka/clients/consumer/internals/CommitRequestManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index c649d382c2e2a..ddd1a03ecd136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -491,7 +491,7 @@ public CompletableFuture> fetchOffsets( } // Visible for testing - protected OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, + OffsetFetchRequestState createOffsetFetchRequest(final Set partitions, final long deadlineMs) { return jitter.isPresent() ? new OffsetFetchRequestState(