Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ default boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
|| topic.startsWith(".");
}

/** Checks if the policy can track back to the source of the topic. */
default boolean canTrackSource(String topic) {
return !isInternalTopic(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class MirrorClientTest {
public class MirrorClientDefaultReplicationPolicyTest {

private static class FakeMirrorClient extends MirrorClient {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.common.Configurable;

import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorClientConfig.HEARTBEATS_TOPIC;

/**
* The replication policy that imitates the behavior of MirrorMaker 1.
*
* <p>The policy doesn't rename topics: {@code topic1} remains {@code topic1} after replication.
* There is one exception to this: for {@code heartbeats}, it behaves identical to {@link DefaultReplicationPolicy}.
*
* <p>The policy has some notable limitations. The most important one is that the policy is unable to detect
* cycles for any topic apart from {@code heartbeats}. This makes cross-replication effectively impossible.
*
* <p>Another limitation is that {@link MirrorClient#remoteTopics()} will be able to list only
* {@code heartbeats} topics.
*
* <p>{@link MirrorClient#countHopsForTopic(String, String)} will return {@code -1} for any topic
* apart from {@code heartbeats}.
*
* <p>The policy supports {@link DefaultReplicationPolicy}'s configurations
* for the behavior related to {@code heartbeats}.
*/
public class IdentityReplicationPolicy implements ReplicationPolicy, Configurable {
// Replication sub-policy for heartbeats topics
private final DefaultReplicationPolicy heartbeatTopicReplicationPolicy = new DefaultReplicationPolicy();

@Override
public void configure(final Map<String, ?> props) {
heartbeatTopicReplicationPolicy.configure(props);
}

@Override
public String formatRemoteTopic(final String sourceClusterAlias, final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
} else {
return topic;
}
}

@Override
public String topicSource(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.topicSource(topic);
} else {
return null;
}
}

@Override
public String upstreamTopic(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.upstreamTopic(topic);
} else {
return topic;
}
}

@Override
public String originalTopic(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return HEARTBEATS_TOPIC;
} else {
return topic;
}
}

@Override
public boolean canTrackSource(final String topic) {
return isOriginalTopicHeartbeats(topic);
}

private boolean isOriginalTopicHeartbeats(final String topic) {
return HEARTBEATS_TOPIC.equals(heartbeatTopicReplicationPolicy.originalTopic(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,11 @@ List<TopicPartition> findSourceTopicPartitions()
List<TopicPartition> findTargetTopicPartitions()
throws InterruptedException, ExecutionException {
Set<String> topics = listTopics(targetAdminClient).stream()
.filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
.filter(t -> !t.equals(config.checkpointsTopic()))
.collect(Collectors.toSet());
// Allow replication policies that can't track back to the source of a topic (like LegacyReplicationPolicy)
// consider all topics as target topics.
.filter(t -> !replicationPolicy.canTrackSource(t)
|| sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class IdentityReplicationPolicyTest {
@Test
public void testFormatRemoteTopic() {
final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
assertEquals("aaa", identityReplicationPolicy.formatRemoteTopic("source1", "aaa"));
assertEquals("source1.heartbeats", identityReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
assertEquals("source2.source1.heartbeats", identityReplicationPolicy.formatRemoteTopic("source2", "source1.heartbeats"));

identityReplicationPolicy.configure(Collections.singletonMap("replication.policy.separator", "__"));
assertEquals("aaa", identityReplicationPolicy.formatRemoteTopic("source1", "aaa"));
assertEquals("source1__heartbeats", identityReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
}

@Test
public void testTopicSource() {
final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
assertNull(identityReplicationPolicy.topicSource("source1.aaa"));
assertNull(identityReplicationPolicy.topicSource("heartbeats"));
assertEquals("source1", identityReplicationPolicy.topicSource("source1.heartbeats"));
assertEquals("source2", identityReplicationPolicy.topicSource("source2.source1.heartbeats"));
}

@Test
public void testUpstreamTopic() {
final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
assertEquals("aaa", identityReplicationPolicy.upstreamTopic("aaa"));
assertEquals("source1.aaa", identityReplicationPolicy.upstreamTopic("source1.aaa"));
assertEquals("heartbeats", identityReplicationPolicy.upstreamTopic("source1.heartbeats"));
}

@Test
public void testOriginalTopic() {
final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
assertEquals("aaa", identityReplicationPolicy.originalTopic("aaa"));
assertEquals("source1.aaa", identityReplicationPolicy.originalTopic("source1.aaa"));
assertEquals("source2.source1.aaa", identityReplicationPolicy.originalTopic("source2.source1.aaa"));
assertEquals("heartbeats", identityReplicationPolicy.originalTopic("heartbeats"));
assertEquals("heartbeats", identityReplicationPolicy.originalTopic("source2.source1.heartbeats"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class MirrorCheckpointTaskTest {

@Test
public void testDownstreamTopicRenaming() {
public void testDownstreamTopicRenamingWithDefaultReplicationPolicy() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
Expand All @@ -43,6 +43,18 @@ public void testDownstreamTopicRenaming() {
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
}

@Test
public void testDownstreamTopicNotRenamingWithLegacyReplicationPolicy() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new IdentityReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("topic3", 4),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
assertEquals(new TopicPartition("target2.topic3", 5),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
assertEquals(new TopicPartition("source6.topic7", 8),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
}

@Test
public void testCheckpoint() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
Expand Down Expand Up @@ -70,6 +82,33 @@ public void testCheckpoint() {
assertEquals(234L, sourceRecord2.timestamp().longValue());
}

@Test
public void testCheckpointLegacyReplicationPolicy() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new IdentityReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
assertEquals(new TopicPartition("topic1", 2), checkpoint1.topicPartition());
assertEquals("group9", checkpoint1.consumerGroupId());
assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
assertEquals(10, checkpoint1.upstreamOffset());
assertEquals(11, checkpoint1.downstreamOffset());
assertEquals(123L, sourceRecord1.timestamp().longValue());
Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(12, null));
SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
assertEquals(new TopicPartition("target2.topic5", 6), checkpoint2.topicPartition());
assertEquals("group11", checkpoint2.consumerGroupId());
assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
assertEquals(12, checkpoint2.upstreamOffset());
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
}

@Test
public void testSyncOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Expand Down
Loading