Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
Expand All @@ -35,6 +34,7 @@
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;

public class RemoteLogMetadataFormatterTest {
private static final Uuid TOPIC_ID = Uuid.randomUuid();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those changes are a kind of "code style". I guess there is no strict rules in our code base. However, I prefer to reduce the code changes if we don't have strong reason.

Copy link
Copy Markdown
Member

@brandboat brandboat Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of grey zone to me. I'm not so sure why make it non-static.
Like Chia Ping mentioned, if we don't have strong reason, let's keep it. And I personally prefer the static one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored the static variables.

Expand All @@ -51,12 +51,12 @@ public void testFormat() throws IOException {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1,
123L, 1024, customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED,
segLeaderEpochs);

byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
"__remote_log_metadata", 0, 0, null, metadataBytes);

String expected = String.format(
"partition: 0, offset: 0, value: " +
Expand All @@ -68,9 +68,11 @@ public void testFormat() throws IOException {
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter();
formatter.writeTo(metadataRecord, ps);
assertEquals(expected, baos.toString());
try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter =
new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) {
formatter.writeTo(metadataRecord, ps);
assertEquals(expected, baos.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,48 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RemoteLogMetadataSerdeTest {

public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new MockTime(1);
private final Time time = new SystemTime();

@Test
public void testRemoteLogSegmentMetadataSerde() {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();

doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
}

@Test
public void testRemoteLogSegmentMetadataUpdateSerde() {
RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate();

doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
}

@Test
public void testRemotePartitionDeleteMetadataSerde() {
RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata();

doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
}

Expand All @@ -70,24 +70,19 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024,
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segLeaderEpochs
);
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
COPY_SEGMENT_STARTED, segLeaderEpochs);
}

private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2);
}

private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() {
return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
time.milliseconds(), 0);
return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, time.milliseconds(), 0);
}

private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
Expand All @@ -96,16 +91,17 @@ private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
byte[] metadataBytes = serializer.serialize(remoteLogMetadata);

// Deserialize the bytes and check the RemoteLogMetadata object is as expected.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and
// deserializer having their own instances.
RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();
RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes);
Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
}

@Test
public void testInvalidRemoteStorageMetadata() {
// Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes.
Assertions.assertThrows(IllegalArgumentException.class,
assertThrows(IllegalArgumentException.class,
() -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
Expand All @@ -29,60 +29,56 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Optional;

import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new MockTime(1);
private final Time time = new SystemTime();

@Test
public void testRemoteLogSegmentMetadataTransform() {
RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform();

RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata();
ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata);
RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform
.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
}

@Test
public void testRemoteLogSegmentMetadataUpdateTransform() {
RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform();

RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate(
new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord =
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
assertEquals(metadataUpdate, metadataUpdateFromRecord);
}

private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
}

@Test
public void testRemoteLogPartitionMetadataTransform() {
RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform();

RemotePartitionDeleteMetadata partitionDeleteMetadata
= new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1);
= new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_STARTED, time.milliseconds(), 1);
ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord =
transform.fromApiMessageAndVersion(apiMessageAndVersion);
assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
}
}
Loading