Skip to content

KAFKA-16901: Add unit tests for ConsumerRecords#records(String)#16227

Merged
chia7712 merged 16 commits intoapache:trunkfrom
frankvicky:KAFKA-16901
Jun 13, 2024
Merged

KAFKA-16901: Add unit tests for ConsumerRecords#records(String)#16227
chia7712 merged 16 commits intoapache:trunkfrom
frankvicky:KAFKA-16901

Conversation

@frankvicky
Copy link
Copy Markdown
Contributor

Add unit tests for ConsumerRecords#records(String).

This commit adds new unit tests to the ConsumerRecordsTest class. One of the tests validates that an IllegalArgumentException occurs when a null topic is passed, and another test validates that the records(String) method is behaving as expected when valid topics are passed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for increasing the test coverage :)

@Test
public void testRecords() {
String targetTopic = "topic";
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = buildTestRecordsWithDummyRecords(targetTopic);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you build the ConsumerRecords with multiple topics? Also, we should verify the record for each topic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it 😸

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712, I have modified the test case to cover the scenario of multiple topics and records, PLTA 😺

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR

return records;
}

private Map<TopicPartition, List<ConsumerRecord<Integer, String>>> buildTopicTestRecords(int recordSize, String... topics) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this method can return ConsumerRecords directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is more clear that we return ConsumerRecords directly.

private Map<TopicPartition, List<ConsumerRecord<Integer, String>>> buildTopicTestRecords(int recordSize, String... topics) {
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> partitionToRecords = new LinkedHashMap<>();
for (String topic : topics) {
ArrayList<ConsumerRecord<Integer, String>> records = new ArrayList<>(recordSize);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List

ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME,
0, 0, 2, "value2", new RecordHeaders(), Optional.empty());

new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I accidentally leave this, I will remove it.

}
}

private Map<TopicPartition, List<ConsumerRecord<Integer, String>>> buildSingleTopicTestRecords(String topic) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this implementation be rewrite by buildTopicTestRecords?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep this because the test case iterator has it' own test logic but I will try to rewrite it.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712, I do few changes based on comments, PTAL 🐧

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR!

public void testRecordsWithNullTopic() {
String nullTopic = null;
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use ConsumerRecords.empty();?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I don't know we have this utility function, I will replace with it.

Iterable<ConsumerRecord<Integer, String>> records = consumerRecords.records(topic);
Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator();

for (int count = 0; iterator.hasNext(); count++) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you verify the number of records from the iterable/iterator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add the verification of it.

private ConsumerRecords<Integer, String> buildTopicTestRecords(int recordSize,
int partitionSize,
int emptyPartitionInterval,
String... topics) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use Collection<String> ?

ConsumerRecord<Integer, String> record = iterator.next();
assertEquals(count, record.partition());
assertEquals(topic, record.topic());
assertEquals(offset, record.offset());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please verify the "key" and "value"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I miss both of them.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712 , I do some changes based on comments, PTAL 😃

String topic = "topic";
int recordSize = 10;
int partitionSize = 15;
int emptyPartitionInterval = 3;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we define specific partition to be empty? that will get simplified I think

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712 , I have do some refactors, PTAL 🐧

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky nice test cases and refactor.

public class ConsumerRecordsTest {

@Test
public void iterator() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please rename it to testIterator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, don't even notice this case doesn't have test prefix 🤣



@Test
public void testRecords() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test for records(TopicPartition)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it 😸

currentPartition = record.partition();
}

assertEquals(topic, record.topic());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those checks are existent in both test cases. Maybe we can reuse them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, it will be more clear

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712 , I have do some changes and add a test case, PTAL 🐧

int partitionCount = 0;
int currentPartition = -1;

while (iterator.hasNext()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can loop records? for (ConsumerRecord<Integer, String> record : records) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it will be more easy to read than using iterator.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @chia7712, I have do a small change based on your feedback, PTAL 😄

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR


@Test
public void iterator() throws Exception {
public void testIterator() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is unused.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I will remove it

}

private void validateEmptyPartition(ConsumerRecord<Integer, String> record, int emptyPartitionIndex) {
if (record.partition() == emptyPartitionIndex) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be assertNotEquals(emptyPartitionIndex, record.partition(), "Partition " + record.partition() + " is not empty");, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in this way it could be more straightforward

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit dd6fcc6 into apache:trunk Jun 13, 2024
apourchet added a commit to apourchet/kafka that referenced this pull request Jun 13, 2024
commit f380cd1
Author: Edoardo Comar <ecomar@uk.ibm.com>
Date:   Thu Jun 13 15:01:08 2024 +0100

    MINOR: Add integration tag to AdminFenceProducersIntegrationTest (apache#16326)

    Add @tag("integration") to AdminFenceProducersIntegrationTest

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 11c85a9
Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Date:   Thu Jun 13 05:11:01 2024 -0400

    MINOR: Make online downgrade failure logs less noisy and update the timeouts scheduled in `convertToConsumerGroup` (apache#16290)

    This patch:
    - changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade.
    - changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`.

    Reviewers: David Jacot <djacot@confluent.io>

commit ea60666
Author: Ken Huang <100591800+m1a2st@users.noreply.github.com>
Date:   Thu Jun 13 17:11:37 2024 +0900

    KAFKA-16921 [1/N] Migrate all junit 4 code to junit 5 for connect module (apache#16253)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 596b945
Author: gongxuanzhang <gongxuanzhang@foxmail.com>
Date:   Thu Jun 13 15:39:32 2024 +0800

    KAFKA-16643 Add ModifierOrder checkstyle rule (apache#15890)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 103ff5c
Author: Antoine Pourchet <antoine@responsive.dev>
Date:   Thu Jun 13 01:32:39 2024 -0600

    KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyTaskAssignor (apache#16318)

    Since the new public API for TaskAssignor shared a name, this rename will prevent users from confusing the internal definition with the public one.

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>

commit e59c887
Author: brenden20 <118419078+brenden20@users.noreply.github.com>
Date:   Thu Jun 13 02:30:05 2024 -0500

    KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it (apache#16291)

    Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

commit dd6fcc6
Author: TingIāu "Ting" Kì <kitingiao@gmail.com>
Date:   Thu Jun 13 14:35:33 2024 +0800

    KAFKA-16901 Add unit tests for ConsumerRecords#records(String) (apache#16227)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit fe98888
Author: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
Date:   Thu Jun 13 08:31:16 2024 +0200

    MINOR: Improving log for outstanding requests on close and cleanup (apache#16304)

    Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>

commit 9ddd58b
Author: Chris Egerton <chrise@aiven.io>
Date:   Thu Jun 13 05:43:33 2024 +0200

    MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (apache#16306)

    Reviewers: Greg Harris <gharris1727@gmail.com>

commit 0a203a9
Author: TingIāu "Ting" Kì <kitingiao@gmail.com>
Date:   Thu Jun 13 09:47:51 2024 +0800

    KAFKA-16938 non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig. (apache#16302)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 6d1f8f8
Author: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com>
Date:   Thu Jun 13 02:42:39 2024 +0100

    MINOR: Clean up for KafkaAdminClientTest (apache#16285)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit e76e1da
Author: Chris Egerton <chrise@aiven.io>
Date:   Thu Jun 13 02:18:23 2024 +0200

    KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (apache#16288)

    Reviewers: Greg Harris <gharris1727@gmail.com>
apourchet added a commit to apourchet/kafka that referenced this pull request Jun 13, 2024
commit 4333af5
Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Date:   Thu Jun 13 11:27:50 2024 -0700

    KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor to LegacyStickyTaskAssignor (apache#16322)

    To avoid confusion in 3.8/until we fully remove all the old task assignors and internal config, we should rename the old internal assignor classes like the StickyTaskAssignor so that they won't be mixed up with the new version of the assignor (which is also named StickyTaskAssignor)

    Reviewers: Bruno Cadonna <cadonna@apache.org>, Josep Prat <josep.prat@aiven.io>

commit f380cd1
Author: Edoardo Comar <ecomar@uk.ibm.com>
Date:   Thu Jun 13 15:01:08 2024 +0100

    MINOR: Add integration tag to AdminFenceProducersIntegrationTest (apache#16326)

    Add @tag("integration") to AdminFenceProducersIntegrationTest

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 11c85a9
Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Date:   Thu Jun 13 05:11:01 2024 -0400

    MINOR: Make online downgrade failure logs less noisy and update the timeouts scheduled in `convertToConsumerGroup` (apache#16290)

    This patch:
    - changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade.
    - changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`.

    Reviewers: David Jacot <djacot@confluent.io>

commit ea60666
Author: Ken Huang <100591800+m1a2st@users.noreply.github.com>
Date:   Thu Jun 13 17:11:37 2024 +0900

    KAFKA-16921 [1/N] Migrate all junit 4 code to junit 5 for connect module (apache#16253)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 596b945
Author: gongxuanzhang <gongxuanzhang@foxmail.com>
Date:   Thu Jun 13 15:39:32 2024 +0800

    KAFKA-16643 Add ModifierOrder checkstyle rule (apache#15890)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 103ff5c
Author: Antoine Pourchet <antoine@responsive.dev>
Date:   Thu Jun 13 01:32:39 2024 -0600

    KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyTaskAssignor (apache#16318)

    Since the new public API for TaskAssignor shared a name, this rename will prevent users from confusing the internal definition with the public one.

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>

commit e59c887
Author: brenden20 <118419078+brenden20@users.noreply.github.com>
Date:   Thu Jun 13 02:30:05 2024 -0500

    KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it (apache#16291)

    Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

commit dd6fcc6
Author: TingIāu "Ting" Kì <kitingiao@gmail.com>
Date:   Thu Jun 13 14:35:33 2024 +0800

    KAFKA-16901 Add unit tests for ConsumerRecords#records(String) (apache#16227)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit fe98888
Author: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
Date:   Thu Jun 13 08:31:16 2024 +0200

    MINOR: Improving log for outstanding requests on close and cleanup (apache#16304)

    Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>

commit 9ddd58b
Author: Chris Egerton <chrise@aiven.io>
Date:   Thu Jun 13 05:43:33 2024 +0200

    MINOR: Add readiness check for connector and separate Kafka cluster in ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic (apache#16306)

    Reviewers: Greg Harris <gharris1727@gmail.com>

commit 0a203a9
Author: TingIāu "Ting" Kì <kitingiao@gmail.com>
Date:   Thu Jun 13 09:47:51 2024 +0800

    KAFKA-16938 non-dynamic props gets corrupted due to circular reference between DynamicBrokerConfig and DynamicConfig. (apache#16302)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 6d1f8f8
Author: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com>
Date:   Thu Jun 13 02:42:39 2024 +0100

    MINOR: Clean up for KafkaAdminClientTest (apache#16285)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit e76e1da
Author: Chris Egerton <chrise@aiven.io>
Date:   Thu Jun 13 02:18:23 2024 +0200

    KAFKA-16935: Automatically wait for cluster startup in embedded Connect integration tests (apache#16288)

    Reviewers: Greg Harris <gharris1727@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants