From 51ac735f5029c33eeb3382c9e16b72db70f0b6fa Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 14 Dec 2021 20:15:18 +0800 Subject: [PATCH 1/7] MINOR: MetadataShell should handle ProducerIdsRecord --- .../kafka/shell/MetadataNodeManager.java | 11 +++++ .../kafka/shell/MetadataNodeManagerTest.java | 42 ++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index fa1b411289c17..db04cc5a8d143 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecordJsonConverter; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; @@ -45,6 +46,7 @@ import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; import org.apache.kafka.shell.MetadataNode.DirectoryNode; import org.apache.kafka.shell.MetadataNode.FileNode; import org.apache.kafka.snapshot.SnapshotReader; @@ -318,6 +320,15 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) node.create(record.key()).setContents(record.value() + ""); break; } + case PRODUCER_IDS_RECORD: { + ProducerIdsRecord record = (ProducerIdsRecord) message; + DirectoryNode producerIdNode = data.root.mkdirs("producerIds"); + producerIdNode.create("broker").setContents(record.brokerId() + ""); + producerIdNode.create("blockStart") + .setContents(record.producerIdsEnd() - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + ""); + producerIdNode.create("blockEnd").setContents(record.producerIdsEnd() - 1 + ""); + break; + } default: throw new RuntimeException("Unhandled metadata record type"); } diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 81483f5290cc9..0513619b936fa 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -25,16 +25,19 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecordJsonConverter; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.server.common.ProducerIdsBlock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -276,7 +279,7 @@ public void testClientQuotaRecord() { "user", "kraft").children().containsKey("producer_byte_rate")); record = new ClientQuotaRecord() - .setEntity(Arrays.asList( + .setEntity(Collections.singletonList( new ClientQuotaRecord.EntityData() .setEntityType("user") .setEntityName(null) @@ -290,4 +293,41 @@ record = new ClientQuotaRecord() metadataNodeManager.getData().root().directory("client-quotas", "user", "").file("producer_byte_rate").contents()); } + + @Test + public void testProducerIdsRecord() { + // generate brokerId + ProducerIdsRecord record1 = new ProducerIdsRecord() + .setBrokerId(0) + .setBrokerEpoch(1) + .setProducerIdsEnd(10000); + metadataNodeManager.handleMessage(record1); + + assertEquals( + "0", + metadataNodeManager.getData().root().directory("producerIds").file("broker").contents()); + assertEquals( + 10000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", + metadataNodeManager.getData().root().directory("producerIds").file("blockStart").contents()); + assertEquals( + "9999", + metadataNodeManager.getData().root().directory("producerIds").file("blockEnd").contents()); + + // generate brokerId again + ProducerIdsRecord record2 = new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(2) + .setProducerIdsEnd(11000); + metadataNodeManager.handleMessage(record2); + + assertEquals( + "1", + metadataNodeManager.getData().root().directory("producerIds").file("broker").contents()); + assertEquals( + 11000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", + metadataNodeManager.getData().root().directory("producerIds").file("blockStart").contents()); + assertEquals( + "10999", + metadataNodeManager.getData().root().directory("producerIds").file("blockEnd").contents()); + } } From 29d8c62e20cee0094bd56173a52772a80f159138 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 23 Dec 2021 21:45:53 +0800 Subject: [PATCH 2/7] fix comments --- .../java/org/apache/kafka/shell/MetadataNodeManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 0513619b936fa..2b31731d05fce 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -296,7 +296,7 @@ record = new ClientQuotaRecord() @Test public void testProducerIdsRecord() { - // generate brokerId + // generate a producerId record ProducerIdsRecord record1 = new ProducerIdsRecord() .setBrokerId(0) .setBrokerEpoch(1) @@ -313,7 +313,7 @@ public void testProducerIdsRecord() { "9999", metadataNodeManager.getData().root().directory("producerIds").file("blockEnd").contents()); - // generate brokerId again + // generate another brokerId ProducerIdsRecord record2 = new ProducerIdsRecord() .setBrokerId(1) .setBrokerEpoch(2) From f17af9524ee692cb5349443c3a296c0fe2759e77 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 10 Feb 2022 11:02:40 +0800 Subject: [PATCH 3/7] resolve comments --- .../kafka/shell/MetadataNodeManager.java | 5 +++-- .../kafka/shell/MetadataNodeManagerTest.java | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index db04cc5a8d143..568e3770f5abe 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -322,8 +322,9 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) } case PRODUCER_IDS_RECORD: { ProducerIdsRecord record = (ProducerIdsRecord) message; - DirectoryNode producerIdNode = data.root.mkdirs("producerIds"); - producerIdNode.create("broker").setContents(record.brokerId() + ""); + DirectoryNode producerIdNode = data.root.mkdirs("lastProducerIdBlock"); + producerIdNode.create("assignedBrokerId").setContents(record.brokerId() + ""); + producerIdNode.create("assignedBrokerEpoch").setContents(record.brokerEpoch() + ""); producerIdNode.create("blockStart") .setContents(record.producerIdsEnd() - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + ""); producerIdNode.create("blockEnd").setContents(record.producerIdsEnd() - 1 + ""); diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 2b31731d05fce..80f8170d7c545 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -305,13 +305,16 @@ public void testProducerIdsRecord() { assertEquals( "0", - metadataNodeManager.getData().root().directory("producerIds").file("broker").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerId").contents()); + assertEquals( + "1", + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); assertEquals( 10000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", - metadataNodeManager.getData().root().directory("producerIds").file("blockStart").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockStart").contents()); assertEquals( "9999", - metadataNodeManager.getData().root().directory("producerIds").file("blockEnd").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockEnd").contents()); // generate another brokerId ProducerIdsRecord record2 = new ProducerIdsRecord() @@ -322,12 +325,15 @@ public void testProducerIdsRecord() { assertEquals( "1", - metadataNodeManager.getData().root().directory("producerIds").file("broker").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerId").contents()); + assertEquals( + "2", + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); assertEquals( 11000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", - metadataNodeManager.getData().root().directory("producerIds").file("blockStart").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockStart").contents()); assertEquals( "10999", - metadataNodeManager.getData().root().directory("producerIds").file("blockEnd").contents()); + metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockEnd").contents()); } } From 81e6a9b81db540886e9e355d21957533412670c4 Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 11 Feb 2022 19:42:37 +0800 Subject: [PATCH 4/7] optimize path --- .../apache/kafka/shell/MetadataNodeManager.java | 12 ++++++------ .../kafka/shell/MetadataNodeManagerTest.java | 16 +++++----------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index 568e3770f5abe..cc2189b7ce636 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -322,12 +322,12 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) } case PRODUCER_IDS_RECORD: { ProducerIdsRecord record = (ProducerIdsRecord) message; - DirectoryNode producerIdNode = data.root.mkdirs("lastProducerIdBlock"); - producerIdNode.create("assignedBrokerId").setContents(record.brokerId() + ""); - producerIdNode.create("assignedBrokerEpoch").setContents(record.brokerEpoch() + ""); - producerIdNode.create("blockStart") - .setContents(record.producerIdsEnd() - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + ""); - producerIdNode.create("blockEnd").setContents(record.producerIdsEnd() - 1 + ""); + DirectoryNode lastBlockNode = data.root.mkdirs("lastProducerIdBlock"); + lastBlockNode.create("assignedBrokerId").setContents(record.brokerId() + ""); + lastBlockNode.create("assignedBrokerEpoch").setContents(record.brokerEpoch() + ""); + + DirectoryNode producerIdsNode = data.root.mkdirs("nextProducerIdBlock"); + producerIdsNode.create("firstProducerId").setContents(record.producerIdsEnd() + ""); break; } default: diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 80f8170d7c545..04d6b4f11229b 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -310,13 +310,10 @@ public void testProducerIdsRecord() { "1", metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); assertEquals( - 10000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockStart").contents()); - assertEquals( - "9999", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockEnd").contents()); + 10000 + "", + metadataNodeManager.getData().root().directory("nextProducerIdBlock").file("firstProducerId").contents()); - // generate another brokerId + // generate another producerId record ProducerIdsRecord record2 = new ProducerIdsRecord() .setBrokerId(1) .setBrokerEpoch(2) @@ -330,10 +327,7 @@ public void testProducerIdsRecord() { "2", metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); assertEquals( - 11000 - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE + "", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockStart").contents()); - assertEquals( - "10999", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("blockEnd").contents()); + 11000 + "", + metadataNodeManager.getData().root().directory("nextProducerIdBlock").file("firstProducerId").contents()); } } From 4d973869cc9d1406631cec1182c0d942e08273d5 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 12 Feb 2022 08:19:22 +0800 Subject: [PATCH 5/7] better naming --- .../org/apache/kafka/shell/MetadataNodeManager.java | 10 ++++------ .../apache/kafka/shell/MetadataNodeManagerTest.java | 12 ++++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index cc2189b7ce636..e689366ca1906 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -46,7 +46,6 @@ import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.ProducerIdsBlock; import org.apache.kafka.shell.MetadataNode.DirectoryNode; import org.apache.kafka.shell.MetadataNode.FileNode; import org.apache.kafka.snapshot.SnapshotReader; @@ -322,12 +321,11 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) } case PRODUCER_IDS_RECORD: { ProducerIdsRecord record = (ProducerIdsRecord) message; - DirectoryNode lastBlockNode = data.root.mkdirs("lastProducerIdBlock"); - lastBlockNode.create("assignedBrokerId").setContents(record.brokerId() + ""); - lastBlockNode.create("assignedBrokerEpoch").setContents(record.brokerEpoch() + ""); + DirectoryNode producerIds = data.root.mkdirs("producerIds"); + producerIds.create("lastBlockBrokerId").setContents(record.brokerId() + ""); + producerIds.create("lastBlockBrokerEpoch").setContents(record.brokerEpoch() + ""); - DirectoryNode producerIdsNode = data.root.mkdirs("nextProducerIdBlock"); - producerIdsNode.create("firstProducerId").setContents(record.producerIdsEnd() + ""); + producerIds.create("nextBlockStartId").setContents(record.producerIdsEnd() + ""); break; } default: diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 04d6b4f11229b..0a300300b2103 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -305,13 +305,13 @@ public void testProducerIdsRecord() { assertEquals( "0", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerId").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents()); assertEquals( "1", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents()); assertEquals( 10000 + "", - metadataNodeManager.getData().root().directory("nextProducerIdBlock").file("firstProducerId").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents()); // generate another producerId record ProducerIdsRecord record2 = new ProducerIdsRecord() @@ -322,12 +322,12 @@ public void testProducerIdsRecord() { assertEquals( "1", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerId").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents()); assertEquals( "2", - metadataNodeManager.getData().root().directory("lastProducerIdBlock").file("assignedBrokerEpoch").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents()); assertEquals( 11000 + "", - metadataNodeManager.getData().root().directory("nextProducerIdBlock").file("firstProducerId").contents()); + metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents()); } } From e7c832cbc1a25d56d883769cf4382f57d317adb4 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 12 Feb 2022 08:27:49 +0800 Subject: [PATCH 6/7] rebase --- .../main/java/org/apache/kafka/shell/MetadataNodeManager.java | 2 +- .../java/org/apache/kafka/shell/MetadataNodeManagerTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index e689366ca1906..f7b867a6b01c3 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -325,7 +325,7 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) producerIds.create("lastBlockBrokerId").setContents(record.brokerId() + ""); producerIds.create("lastBlockBrokerEpoch").setContents(record.brokerEpoch() + ""); - producerIds.create("nextBlockStartId").setContents(record.producerIdsEnd() + ""); + producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + ""); break; } default: diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 0a300300b2103..d1f2fc18f5ecb 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -300,7 +300,7 @@ public void testProducerIdsRecord() { ProducerIdsRecord record1 = new ProducerIdsRecord() .setBrokerId(0) .setBrokerEpoch(1) - .setProducerIdsEnd(10000); + .setNextProducerId(10000); metadataNodeManager.handleMessage(record1); assertEquals( @@ -317,7 +317,7 @@ public void testProducerIdsRecord() { ProducerIdsRecord record2 = new ProducerIdsRecord() .setBrokerId(1) .setBrokerEpoch(2) - .setProducerIdsEnd(11000); + .setNextProducerId(11000); metadataNodeManager.handleMessage(record2); assertEquals( From 2888923cee1cd3c290a46397b6c0ccf203622011 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 12 Feb 2022 09:00:16 +0800 Subject: [PATCH 7/7] fix checkstyle --- .../java/org/apache/kafka/shell/MetadataNodeManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index d1f2fc18f5ecb..f0cfffb28178b 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; -import org.apache.kafka.server.common.ProducerIdsBlock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test;