From 438aaa56b7e066c409cd4f528afd6ab0685d18f3 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 5 Jun 2024 09:29:07 +0530 Subject: [PATCH 1/3] MINOR: Cleanup the storage module tests. - Use SystemTime instead of MockTime when time is not mocked - Use static assertions to reduce the line length - Fold the lines if it exceeds the limit - rename tp0 to tpId0 when it refers to TopicIdPartition - remove unnecessary static from variables --- .../RemoteLogMetadataFormatterTest.java | 30 +++--- .../storage/RemoteLogMetadataSerdeTest.java | 46 ++++---- .../RemoteLogMetadataTransformTest.java | 44 ++++---- ...sedRemoteLogMetadataManagerConfigTest.java | 55 ++++------ ...adataManagerMultipleSubscriptionsTest.java | 41 +++---- ...edRemoteLogMetadataManagerRestartTest.java | 25 ++--- ...opicBasedRemoteLogMetadataManagerTest.java | 76 ++++++------- .../storage/RemoteLogMetadataManagerTest.java | 101 +++++++++--------- 8 files changed, 201 insertions(+), 217 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index e3d1a2aee0cd3..6f0d0e372e0a6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; @@ -35,12 +34,13 @@ import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; public class RemoteLogMetadataFormatterTest { - private static final Uuid TOPIC_ID = Uuid.randomUuid(); - private static final String TOPIC = "foo"; - private static final TopicIdPartition TP0 = new TopicIdPartition(TOPIC_ID, new TopicPartition(TOPIC, 0)); - private static final Uuid SEGMENT_ID = Uuid.randomUuid(); + private final Uuid topicId = Uuid.randomUuid(); + private final String topic = "foo"; + private final TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition(topic, 0)); + private final Uuid segmentId = Uuid.randomUuid(); @Test public void testFormat() throws IOException { @@ -48,15 +48,15 @@ public void testFormat() throws IOException { segLeaderEpochs.put(0, 0L); segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tp0, segmentId); Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( - remoteLogSegmentId, 0L, 100L, -1L, 1, - 123L, 1024, customMetadata, - RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs); + remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, + segLeaderEpochs); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); - ConsumerRecord metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes); + ConsumerRecord metadataRecord = new ConsumerRecord<>( + "__remote_log_metadata", 0, 0, null, metadataBytes); String expected = String.format( "partition: 0, offset: 0, value: " + @@ -65,12 +65,14 @@ public void testFormat() throws IOException { "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " + "customMetadata=Optional[CustomMetadata{10 bytes}], " + "state=COPY_SEGMENT_STARTED}\n", - TOPIC_ID, SEGMENT_ID); + topicId, segmentId); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos)) { - RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter(); - formatter.writeTo(metadataRecord, ps); - assertEquals(expected, baos.toString()); + try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = + new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) { + formatter.writeTo(metadataRecord, ps); + assertEquals(expected, baos.toString()); + } } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index 5b48790c7fdc9..44623e18de9a0 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -19,48 +19,48 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + public class RemoteLogMetadataSerdeTest { - public static final String TOPIC = "foo"; - private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); - private final Time time = new MockTime(1); + public final String topic = "foo"; + private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topic, 0)); + private final Time time = new SystemTime(); @Test public void testRemoteLogSegmentMetadataSerde() { RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); - doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); } @Test public void testRemoteLogSegmentMetadataUpdateSerde() { RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); - doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); } @Test public void testRemotePartitionDeleteMetadataSerde() { RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); - doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); } @@ -69,25 +69,20 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { segLeaderEpochs.put(0, 0L); segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, - time.milliseconds(), 1024, - Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_STARTED, - segLeaderEpochs - ); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, + time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), + COPY_SEGMENT_STARTED, segLeaderEpochs); } private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), - Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2); + Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2); } private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { - return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, - time.milliseconds(), 0); + return new RemotePartitionDeleteMetadata(tpId0, DELETE_PARTITION_MARKED, time.milliseconds(), 0); } private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { @@ -96,16 +91,17 @@ private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { byte[] metadataBytes = serializer.serialize(remoteLogMetadata); // Deserialize the bytes and check the RemoteLogMetadata object is as expected. - // Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances. + // Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and + // deserializer having their own instances. RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes); - Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata); + assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata); } @Test public void testInvalidRemoteStorageMetadata() { // Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes. - Assertions.assertThrows(IllegalArgumentException.class, + assertThrows(IllegalArgumentException.class, () -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds()))); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index 504f47e17a584..0c6edbc6a6c1b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform; @@ -29,60 +29,56 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.Optional; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class RemoteLogMetadataTransformTest { - private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private final Time time = new MockTime(1); + private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + private final Time time = new SystemTime(); @Test public void testRemoteLogSegmentMetadataTransform() { RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform(); - RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata(); ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata); RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform .fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord); + assertEquals(metadata, remoteLogSegmentMetadataFromRecord); } @Test public void testRemoteLogSegmentMetadataUpdateTransform() { RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform(); - - RemoteLogSegmentMetadataUpdate metadataUpdate = - new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(), - Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1); + RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate( + new RemoteLogSegmentId(tpId0, Uuid.randomUuid()), time.milliseconds(), + Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1); ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate); - RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord); + RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = + metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); + assertEquals(metadataUpdate, metadataUpdateFromRecord); } private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, - time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); + time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); } @Test public void testRemoteLogPartitionMetadataTransform() { RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform(); - RemotePartitionDeleteMetadata partitionDeleteMetadata - = new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1); + = new RemotePartitionDeleteMetadata(tpId0, DELETE_PARTITION_STARTED, time.milliseconds(), 1); ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata); - RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord); + RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = + transform.fromApiMessageAndVersion(apiMessageAndVersion); + assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java index 8e3985d0d5fb5..6c540a243d5b9 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java @@ -20,10 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.AbstractMap; import java.util.HashMap; @@ -38,10 +35,10 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; -public class TopicBasedRemoteLogMetadataManagerConfigTest { - private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class); +import static org.junit.jupiter.api.Assertions.assertEquals; - private static final String BOOTSTRAP_SERVERS = "localhost:9091"; +public class TopicBasedRemoteLogMetadataManagerConfigTest { + private final String bootstrapServers = "localhost:2222"; @Test public void testValidConfig() { @@ -60,41 +57,32 @@ public void testValidConfig() { // Check for topic properties TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); - Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); + assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); // Check for common client configs. - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(bootstrapServers, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(bootstrapServers, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(bootstrapServers, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); for (Map.Entry entry : commonClientConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.commonProperties().get(entry.getKey())); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.producerProperties().get(entry.getKey())); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.consumerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.commonProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey())); } - // Check for producer configs. for (Map.Entry entry : producerConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.producerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey())); } - // Check for consumer configs. for (Map.Entry entry : consumerConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.consumerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey())); } } @Test public void testCommonProducerConsumerOverridesConfig() { - Map.Entry overrideEntry = new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); + Map.Entry overrideEntry = + new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); Map commonClientConfig = new HashMap<>(); commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10); commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L); @@ -114,22 +102,18 @@ public void testCommonProducerConsumerOverridesConfig() { Map props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); - Assertions.assertEquals(overrideCommonPropValue, - rlmmConfig.commonProperties().get(overrideEntry.getKey())); - Assertions.assertEquals(overriddenProducerPropValue, - rlmmConfig.producerProperties().get(overrideEntry.getKey())); - Assertions.assertEquals(overriddenConsumerPropValue, - rlmmConfig.consumerProperties().get(overrideEntry.getKey())); + assertEquals(overrideCommonPropValue, rlmmConfig.commonProperties().get(overrideEntry.getKey())); + assertEquals(overriddenProducerPropValue, rlmmConfig.producerProperties().get(overrideEntry.getKey())); + assertEquals(overriddenConsumerPropValue, rlmmConfig.consumerProperties().get(overrideEntry.getKey())); } private Map createValidConfigProps(Map commonClientConfig, Map producerConfig, Map consumerConfig) { Map props = new HashMap<>(); - props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(BROKER_ID, 1); props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath()); - props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3); props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10); props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 1000L); @@ -138,17 +122,14 @@ private Map createValidConfigProps(Map commonCli for (Map.Entry entry : commonClientConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + entry.getKey(), entry.getValue()); } - // producer configs for (Map.Entry entry : producerConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), entry.getValue()); } - //consumer configs for (Map.Entry entry : consumerConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), entry.getValue()); } - return props; } } \ No newline at end of file diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java index 916b475c143b9..d02da09804048 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -27,12 +27,11 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -44,6 +43,10 @@ import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -55,10 +58,7 @@ @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { private final ClusterInstance clusterInstance; - - private static final int SEG_SIZE = 1024 * 1024; - - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; @@ -125,24 +125,25 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { // Add segments for these partitions but an exception is received as they have not yet been subscribed. // These messages would have been published to the respective metadata topic partitions but the ConsumerManager // has not yet been subscribing as they are not yet registered. + int segSize = 1048576; RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); - Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + ExecutionException exception = assertThrows(ExecutionException.class, + () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", exception.getMessage()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); - Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", exception.getMessage()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)); - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); - + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)); + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.emptySet()); @@ -156,8 +157,8 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { // leader partitions would have received as it is registered, but follower partition is not yet registered, // hence it throws an exception. - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext()); - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext()); + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); // Register follower partition // Phaser::bulkRegister and Phaser::register provide the "countUp" feature @@ -172,15 +173,15 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata); // In this state, all the metadata should be available in RLMM for both leader and follower partitions. - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); } } private void createTopic(String topic, Map> replicasAssignments) { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments))); - Assertions.assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size())); + assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size())); } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index dee686dc036d1..a08f12d9ae22b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -24,12 +24,11 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -37,14 +36,13 @@ import java.util.Collections; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class TopicBasedRemoteLogMetadataManagerRestartTest { - private static final int SEG_SIZE = 1024 * 1024; - - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); private final ClusterInstance clusterInstance; @@ -76,16 +74,17 @@ public void testRLMMAPIsAfterRestart() throws Exception { clusterInstance.waitForTopic(leaderTopic, 1); clusterInstance.waitForTopic(followerTopic, 1); - final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); - final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + int segSize = 1048576; RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { // Register these partitions to RemoteLogMetadataManager. @@ -115,12 +114,14 @@ public void testRLMMAPIsAfterRestart() throws Exception { RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 101, 200, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 101L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); // Check that both the stored segment and recently added segment are available. - Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), - topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))); + assertTrue(TestUtils.sameElementsWithoutOrder( + Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), + topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + ); } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 183a84934127d..e90952ec74cf5 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -25,14 +25,13 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; @@ -42,6 +41,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -50,11 +53,10 @@ @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerTest { - private static final int SEG_SIZE = 1024 * 1024; - + private final int segSize = 1048576; private final ClusterInstance clusterInstance; private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager; TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { @@ -83,7 +85,7 @@ public void testDoesTopicExist() throws ExecutionException, InterruptedException admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get(); clusterInstance.waitForTopic(topic, 1); boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); - Assertions.assertTrue(doesTopicExist); + assertTrue(doesTopicExist); } } @@ -92,7 +94,7 @@ public void testTopicDoesNotExist() { try (Admin admin = clusterInstance.createAdminClient()) { String topic = "dummy-test-topic"; boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); - Assertions.assertFalse(doesTopicExist); + assertFalse(doesTopicExist); } } @@ -138,38 +140,38 @@ public void testNewPartitionUpdates() throws Exception { // has not yet been subscribing as they are not yet registered. RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition), Collections.singleton(newFollowerTopicIdPartition)); // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata); - Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); - Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); + assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); + assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } @ClusterTest public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); } @ClusterTest @@ -192,11 +194,11 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L)); + 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L)); + 200, 300, -1L, 0, time.milliseconds(), segSize * 3, Collections.singletonMap(0, 0L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -206,8 +208,8 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); @@ -215,7 +217,7 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0); - Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize); + assertEquals(segSize * 6, remoteLogSize); } @ClusterTest @@ -237,11 +239,11 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); + 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(1, 100L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L)); + 200, 300, -1L, 0, time.milliseconds(), segSize * 3, Collections.singletonMap(2, 200L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -251,16 +253,16 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); - Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); - Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); - Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); + assertEquals(segSize, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); + assertEquals(segSize * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); + assertEquals(segSize * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); } @ClusterTest @@ -282,9 +284,9 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); + 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(1, 100L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -293,13 +295,13 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); - Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); + assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index b67c316e5e499..ace1b4928e1de 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -23,33 +23,39 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils; import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; -//import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED; + @Tag("integration") @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(brokers = 3) public class RemoteLogMetadataManagerTest { private final ClusterInstance clusterInstance; - - private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private static final int SEG_SIZE = 1024 * 1024; - private static final int BROKER_ID_0 = 0; - private static final int BROKER_ID_1 = 1; - - private final Time time = new MockTime(1); + private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + private final int segSize = 1048576; + private final int brokerId0 = 0; + private final int brokerId1 = 1; + private final Time time = new SystemTime(); RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; @@ -66,31 +72,30 @@ private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { @ClusterTest public void testFetchSegments() throws Exception { try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) { - remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet()); + remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(tpId0), Collections.emptySet()); // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. Map segmentLeaderEpochs = Collections.singletonMap(0, 101L); - RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( + segmentId, 101L, 200L, -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); // Wait until the segment is added successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); // Search should not return the above segment. - Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); + assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 0, 150).isPresent()); // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. - RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), - Optional.empty(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, - BROKER_ID_1); + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, brokerId1); // Wait until the segment is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); // Search should return the above segment. - Optional segmentMetadataForOffset150 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150); - Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); + Optional segmentMetadataForOffset150 = + remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 0, 150); + assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); } } @@ -98,7 +103,7 @@ public void testFetchSegments() throws Exception { public void testRemotePartitionDeletion() throws Exception { try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) { remoteLogMetadataManager.configure(Collections.emptyMap()); - remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet()); + remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(tpId0), Collections.emptySet()); // Create remote log segment metadata and add them to RLMM. @@ -110,52 +115,52 @@ public void testRemotePartitionDeletion() throws Exception { segmentLeaderEpochs.put(1, 20L); segmentLeaderEpochs.put(2, 50L); segmentLeaderEpochs.put(3, 80L); - RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L, - -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, - segmentLeaderEpochs); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( + segmentId, 0L, 100L, -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); // Wait until the segment is added successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( - segmentId, time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, brokerId1); // Wait until the segment is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); // Check that the segment exists in RLMM. - Optional segMetadataForOffset30Epoch1 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); + Optional segMetadataForOffset30Epoch1 = + remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); // Mark the partition for deletion and wait for it to be updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_MARKED)).get()); - Optional segmentMetadataAfterDelMark = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, - 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); + Optional segmentMetadataAfterDelMark = + remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); // Set the partition deletion state as started. Partition and segments should still be accessible as they are not // yet deleted. Wait until the segment state is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_STARTED)).get()); - Optional segmentMetadataAfterDelStart = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, - 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); + Optional segmentMetadataAfterDelStart = + remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); // Set the partition deletion state as finished. RLMM should clear all its internal state for that partition. // Wait until the segment state is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_FINISHED)).get()); - Assertions.assertThrows(RemoteResourceNotFoundException.class, - () -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L)); + assertThrows(RemoteResourceNotFoundException.class, + () -> remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L)); } } private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) { - return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0); + return new RemotePartitionDeleteMetadata(tpId0, state, time.milliseconds(), brokerId0); } } \ No newline at end of file From 838cfde58aa5cc1349de485febb3a4a223310a73 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 5 Jun 2024 09:32:41 +0530 Subject: [PATCH 2/3] rename variable --- .../metadata/storage/RemoteLogMetadataFormatterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index 6f0d0e372e0a6..48996a0ffc776 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -39,7 +39,7 @@ public class RemoteLogMetadataFormatterTest { private final Uuid topicId = Uuid.randomUuid(); private final String topic = "foo"; - private final TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition(topic, 0)); + private final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition(topic, 0)); private final Uuid segmentId = Uuid.randomUuid(); @Test @@ -48,7 +48,7 @@ public void testFormat() throws IOException { segLeaderEpochs.put(0, 0L); segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tp0, segmentId); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, segmentId); Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, From ad85c3775c7e5ccf004eeb9184afe30fbc274fca Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 5 Jun 2024 22:53:05 +0530 Subject: [PATCH 3/3] restore the static variables --- .../RemoteLogMetadataFormatterTest.java | 12 +++--- .../storage/RemoteLogMetadataSerdeTest.java | 10 ++--- .../RemoteLogMetadataTransformTest.java | 8 ++-- ...sedRemoteLogMetadataManagerConfigTest.java | 10 ++--- ...opicBasedRemoteLogMetadataManagerTest.java | 30 +++++++-------- .../storage/RemoteLogMetadataManagerTest.java | 38 +++++++++---------- 6 files changed, 54 insertions(+), 54 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index 48996a0ffc776..1380a735fba89 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -37,10 +37,10 @@ import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; public class RemoteLogMetadataFormatterTest { - private final Uuid topicId = Uuid.randomUuid(); - private final String topic = "foo"; - private final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition(topic, 0)); - private final Uuid segmentId = Uuid.randomUuid(); + private static final Uuid TOPIC_ID = Uuid.randomUuid(); + private static final String TOPIC = "foo"; + private static final TopicIdPartition TP0 = new TopicIdPartition(TOPIC_ID, new TopicPartition(TOPIC, 0)); + private static final Uuid SEGMENT_ID = Uuid.randomUuid(); @Test public void testFormat() throws IOException { @@ -48,7 +48,7 @@ public void testFormat() throws IOException { segLeaderEpochs.put(0, 0L); segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, segmentId); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID); Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, @@ -65,7 +65,7 @@ public void testFormat() throws IOException { "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " + "customMetadata=Optional[CustomMetadata{10 bytes}], " + "state=COPY_SEGMENT_STARTED}\n", - topicId, segmentId); + TOPIC_ID, SEGMENT_ID); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos)) { try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index 44623e18de9a0..b1b91dacf2379 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -42,8 +42,8 @@ public class RemoteLogMetadataSerdeTest { - public final String topic = "foo"; - private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topic, 0)); + public static final String TOPIC = "foo"; + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); private final Time time = new SystemTime(); @Test @@ -69,20 +69,20 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { segLeaderEpochs.put(0, 0L); segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_STARTED, segLeaderEpochs); } private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2); } private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { - return new RemotePartitionDeleteMetadata(tpId0, DELETE_PARTITION_MARKED, time.milliseconds(), 0); + return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, time.milliseconds(), 0); } private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index 0c6edbc6a6c1b..70770542eb911 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -40,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class RemoteLogMetadataTransformTest { - private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); private final Time time = new SystemTime(); @Test @@ -57,7 +57,7 @@ public void testRemoteLogSegmentMetadataTransform() { public void testRemoteLogSegmentMetadataUpdateTransform() { RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform(); RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate( - new RemoteLogSegmentId(tpId0, Uuid.randomUuid()), time.milliseconds(), + new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(), Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1); ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate); RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = @@ -66,7 +66,7 @@ public void testRemoteLogSegmentMetadataUpdateTransform() { } private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { - RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); } @@ -75,7 +75,7 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { public void testRemoteLogPartitionMetadataTransform() { RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform(); RemotePartitionDeleteMetadata partitionDeleteMetadata - = new RemotePartitionDeleteMetadata(tpId0, DELETE_PARTITION_STARTED, time.milliseconds(), 1); + = new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_STARTED, time.milliseconds(), 1); ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata); RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java index 6c540a243d5b9..34f1fb083667e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java @@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class TopicBasedRemoteLogMetadataManagerConfigTest { - private final String bootstrapServers = "localhost:2222"; + private static final String BOOTSTRAP_SERVERS = "localhost:2222"; @Test public void testValidConfig() { @@ -60,9 +60,9 @@ public void testValidConfig() { assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); // Check for common client configs. - assertEquals(bootstrapServers, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - assertEquals(bootstrapServers, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - assertEquals(bootstrapServers, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); for (Map.Entry entry : commonClientConfig.entrySet()) { assertEquals(entry.getValue(), rlmmConfig.commonProperties().get(entry.getKey())); @@ -111,7 +111,7 @@ private Map createValidConfigProps(Map commonCli Map producerConfig, Map consumerConfig) { Map props = new HashMap<>(); - props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(BROKER_ID, 1); props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath()); props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index e90952ec74cf5..8b9cfd0700c3a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -53,7 +53,7 @@ @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerTest { - private final int segSize = 1048576; + private static final int SEG_SIZE = 1048576; private final ClusterInstance clusterInstance; private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); private final Time time = new SystemTime(); @@ -140,12 +140,12 @@ public void testNewPartitionUpdates() throws Exception { // has not yet been subscribing as they are not yet registered. RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. @@ -194,11 +194,11 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(0, 0L)); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), segSize * 3, Collections.singletonMap(0, 0L)); + 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(0, 0L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -217,7 +217,7 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0); - assertEquals(segSize * 6, remoteLogSize); + assertEquals(SEG_SIZE * 6, remoteLogSize); } @ClusterTest @@ -239,11 +239,11 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(1, 100L)); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); RemoteLogSegmentMetadata thirdSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 200, 300, -1L, 0, time.milliseconds(), segSize * 3, Collections.singletonMap(2, 200L)); + 200, 300, -1L, 0, time.milliseconds(), SEG_SIZE * 3, Collections.singletonMap(2, 200L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); @@ -260,9 +260,9 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); - assertEquals(segSize, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); - assertEquals(segSize * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); - assertEquals(segSize * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); + assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); + assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); + assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); } @ClusterTest @@ -284,9 +284,9 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 0, 100, -1L, 0, time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), - 100, 200, -1L, 0, time.milliseconds(), segSize * 2, Collections.singletonMap(1, 100L)); + 100, 200, -1L, 0, time.milliseconds(), SEG_SIZE * 2, Collections.singletonMap(1, 100L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(firstSegmentMetadata); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(secondSegmentMetadata); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index ace1b4928e1de..d970dd8b68ccc 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -51,10 +51,10 @@ @ClusterTestDefaults(brokers = 3) public class RemoteLogMetadataManagerTest { private final ClusterInstance clusterInstance; - private final TopicIdPartition tpId0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private final int segSize = 1048576; - private final int brokerId0 = 0; - private final int brokerId1 = 1; + private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + private static final int SEG_SIZE = 1048576; + private static final int BROKER_ID_0 = 0; + private static final int BROKER_ID_1 = 1; private final Time time = new SystemTime(); RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { @@ -72,29 +72,29 @@ private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { @ClusterTest public void testFetchSegments() throws Exception { try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) { - remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(tpId0), Collections.emptySet()); + remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet()); // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. Map segmentLeaderEpochs = Collections.singletonMap(0, 101L); - RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( - segmentId, 101L, 200L, -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); + segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); // Search should not return the above segment. - assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 0, 150).isPresent()); + assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( - segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, brokerId1); + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1); // Wait until the segment is updated successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); // Search should return the above segment. Optional segmentMetadataForOffset150 = - remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 0, 150); + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150); assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); } } @@ -103,7 +103,7 @@ public void testFetchSegments() throws Exception { public void testRemotePartitionDeletion() throws Exception { try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = topicBasedRlmm()) { remoteLogMetadataManager.configure(Collections.emptyMap()); - remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(tpId0), Collections.emptySet()); + remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet()); // Create remote log segment metadata and add them to RLMM. @@ -115,14 +115,14 @@ public void testRemotePartitionDeletion() throws Exception { segmentLeaderEpochs.put(1, 20L); segmentLeaderEpochs.put(2, 50L); segmentLeaderEpochs.put(3, 80L); - RemoteLogSegmentId segmentId = new RemoteLogSegmentId(tpId0, Uuid.randomUuid()); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( - segmentId, 0L, 100L, -1L, brokerId0, time.milliseconds(), segSize, segmentLeaderEpochs); + segmentId, 0L, 100L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( - segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, brokerId1); + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1); // Wait until the segment is updated successfully. assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); @@ -130,7 +130,7 @@ public void testRemotePartitionDeletion() throws Exception { // Check that the segment exists in RLMM. Optional segMetadataForOffset30Epoch1 = - remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); // Mark the partition for deletion and wait for it to be updated successfully. @@ -138,7 +138,7 @@ public void testRemotePartitionDeletion() throws Exception { createRemotePartitionDeleteMetadata(DELETE_PARTITION_MARKED)).get()); Optional segmentMetadataAfterDelMark = - remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); // Set the partition deletion state as started. Partition and segments should still be accessible as they are not @@ -147,7 +147,7 @@ public void testRemotePartitionDeletion() throws Exception { createRemotePartitionDeleteMetadata(DELETE_PARTITION_STARTED)).get()); Optional segmentMetadataAfterDelStart = - remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L); + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); // Set the partition deletion state as finished. RLMM should clear all its internal state for that partition. @@ -156,11 +156,11 @@ public void testRemotePartitionDeletion() throws Exception { createRemotePartitionDeleteMetadata(DELETE_PARTITION_FINISHED)).get()); assertThrows(RemoteResourceNotFoundException.class, - () -> remoteLogMetadataManager.remoteLogSegmentMetadata(tpId0, 1, 30L)); + () -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L)); } } private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) { - return new RemotePartitionDeleteMetadata(tpId0, state, time.milliseconds(), brokerId0); + return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0); } } \ No newline at end of file