diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index 11f73f50ceac5..939c1d3e6103c 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -57,4 +57,9 @@ default boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
|| topic.startsWith(".");
}
+
+ /** Checks if the policy can track back to the source of the topic. */
+ default boolean canTrackSource(String topic) {
+ return !isInternalTopic(topic);
+ }
}
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
similarity index 99%
rename from connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
rename to connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
index 766b91ce69944..4a09064da9c5f 100644
--- a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
@@ -30,7 +30,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
-public class MirrorClientTest {
+public class MirrorClientDefaultReplicationPolicyTest {
private static class FakeMirrorClient extends MirrorClient {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
new file mode 100644
index 0000000000000..4c3b77d2c0c8d
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+
+import static org.apache.kafka.connect.mirror.MirrorClientConfig.HEARTBEATS_TOPIC;
+
+/**
+ * The replication policy that imitates the behavior of MirrorMaker 1.
+ *
+ *
The policy doesn't rename topics: {@code topic1} remains {@code topic1} after replication.
+ * There is one exception to this: for {@code heartbeats}, it behaves identical to {@link DefaultReplicationPolicy}.
+ *
+ *
The policy has some notable limitations. The most important one is that the policy is unable to detect
+ * cycles for any topic apart from {@code heartbeats}. This makes cross-replication effectively impossible.
+ *
+ *
Another limitation is that {@link MirrorClient#remoteTopics()} will be able to list only
+ * {@code heartbeats} topics.
+ *
+ *
{@link MirrorClient#countHopsForTopic(String, String)} will return {@code -1} for any topic
+ * apart from {@code heartbeats}.
+ *
+ *
The policy supports {@link DefaultReplicationPolicy}'s configurations
+ * for the behavior related to {@code heartbeats}.
+ */
+public class IdentityReplicationPolicy implements ReplicationPolicy, Configurable {
+ // Replication sub-policy for heartbeats topics
+ private final DefaultReplicationPolicy heartbeatTopicReplicationPolicy = new DefaultReplicationPolicy();
+
+ @Override
+ public void configure(final Map props) {
+ heartbeatTopicReplicationPolicy.configure(props);
+ }
+
+ @Override
+ public String formatRemoteTopic(final String sourceClusterAlias, final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public String topicSource(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.topicSource(topic);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public String upstreamTopic(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.upstreamTopic(topic);
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public String originalTopic(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return HEARTBEATS_TOPIC;
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public boolean canTrackSource(final String topic) {
+ return isOriginalTopicHeartbeats(topic);
+ }
+
+ private boolean isOriginalTopicHeartbeats(final String topic) {
+ return HEARTBEATS_TOPIC.equals(heartbeatTopicReplicationPolicy.originalTopic(topic));
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 91a40a638cbf6..2c1409698bbff 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -203,9 +203,11 @@ List findSourceTopicPartitions()
List findTargetTopicPartitions()
throws InterruptedException, ExecutionException {
Set topics = listTopics(targetAdminClient).stream()
- .filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
- .filter(t -> !t.equals(config.checkpointsTopic()))
- .collect(Collectors.toSet());
+ // Allow replication policies that can't track back to the source of a topic (like LegacyReplicationPolicy)
+ // consider all topics as target topics.
+ .filter(t -> !replicationPolicy.canTrackSource(t)
+ || sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+ .collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
.collect(Collectors.toList());
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicyTest.java
new file mode 100644
index 0000000000000..b969638d12c6b
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicyTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class IdentityReplicationPolicyTest {
+ @Test
+ public void testFormatRemoteTopic() {
+ final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
+ assertEquals("aaa", identityReplicationPolicy.formatRemoteTopic("source1", "aaa"));
+ assertEquals("source1.heartbeats", identityReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
+ assertEquals("source2.source1.heartbeats", identityReplicationPolicy.formatRemoteTopic("source2", "source1.heartbeats"));
+
+ identityReplicationPolicy.configure(Collections.singletonMap("replication.policy.separator", "__"));
+ assertEquals("aaa", identityReplicationPolicy.formatRemoteTopic("source1", "aaa"));
+ assertEquals("source1__heartbeats", identityReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
+ }
+
+ @Test
+ public void testTopicSource() {
+ final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
+ assertNull(identityReplicationPolicy.topicSource("source1.aaa"));
+ assertNull(identityReplicationPolicy.topicSource("heartbeats"));
+ assertEquals("source1", identityReplicationPolicy.topicSource("source1.heartbeats"));
+ assertEquals("source2", identityReplicationPolicy.topicSource("source2.source1.heartbeats"));
+ }
+
+ @Test
+ public void testUpstreamTopic() {
+ final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
+ assertEquals("aaa", identityReplicationPolicy.upstreamTopic("aaa"));
+ assertEquals("source1.aaa", identityReplicationPolicy.upstreamTopic("source1.aaa"));
+ assertEquals("heartbeats", identityReplicationPolicy.upstreamTopic("source1.heartbeats"));
+ }
+
+ @Test
+ public void testOriginalTopic() {
+ final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
+ assertEquals("aaa", identityReplicationPolicy.originalTopic("aaa"));
+ assertEquals("source1.aaa", identityReplicationPolicy.originalTopic("source1.aaa"));
+ assertEquals("source2.source1.aaa", identityReplicationPolicy.originalTopic("source2.source1.aaa"));
+ assertEquals("heartbeats", identityReplicationPolicy.originalTopic("heartbeats"));
+ assertEquals("heartbeats", identityReplicationPolicy.originalTopic("source2.source1.heartbeats"));
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index abd314bb71c9e..0b973de430802 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -32,7 +32,7 @@
public class MirrorCheckpointTaskTest {
@Test
- public void testDownstreamTopicRenaming() {
+ public void testDownstreamTopicRenamingWithDefaultReplicationPolicy() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
@@ -43,6 +43,18 @@ public void testDownstreamTopicRenaming() {
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
}
+ @Test
+ public void testDownstreamTopicNotRenamingWithLegacyReplicationPolicy() {
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new IdentityReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
+ assertEquals(new TopicPartition("topic3", 4),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
+ assertEquals(new TopicPartition("target2.topic3", 5),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
+ assertEquals(new TopicPartition("source6.topic7", 8),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
+ }
+
@Test
public void testCheckpoint() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
@@ -70,6 +82,33 @@ public void testCheckpoint() {
assertEquals(234L, sourceRecord2.timestamp().longValue());
}
+ @Test
+ public void testCheckpointLegacyReplicationPolicy() {
+ OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new IdentityReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
+ offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
+ offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+ Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
+ new OffsetAndMetadata(10, null));
+ SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
+ assertEquals(new TopicPartition("topic1", 2), checkpoint1.topicPartition());
+ assertEquals("group9", checkpoint1.consumerGroupId());
+ assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
+ assertEquals(10, checkpoint1.upstreamOffset());
+ assertEquals(11, checkpoint1.downstreamOffset());
+ assertEquals(123L, sourceRecord1.timestamp().longValue());
+ Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
+ new OffsetAndMetadata(12, null));
+ SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
+ assertEquals(new TopicPartition("target2.topic5", 6), checkpoint2.topicPartition());
+ assertEquals("group11", checkpoint2.consumerGroupId());
+ assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
+ assertEquals(12, checkpoint2.upstreamOffset());
+ assertEquals(13, checkpoint2.downstreamOffset());
+ assertEquals(234L, sourceRecord2.timestamp().longValue());
+ }
+
@Test
public void testSyncOffset() {
Map> idleConsumerGroupsOffset = new HashMap<>();
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorClientIdentityReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorClientIdentityReplicationPolicyTest.java
new file mode 100644
index 0000000000000..573d78e54aeb2
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorClientIdentityReplicationPolicyTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.Configurable;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorClientIdentityReplicationPolicyTest {
+
+ private static class FakeMirrorClient extends MirrorClient {
+
+ List topics;
+
+ FakeMirrorClient(List topics) {
+ super(null, new IdentityReplicationPolicy(), null);
+ this.topics = topics;
+ }
+
+ FakeMirrorClient() {
+ this(Collections.emptyList());
+ }
+
+ @Override
+ protected Set listTopics() {
+ return new HashSet<>(topics);
+ }
+ }
+
+ @Test
+ public void testIsHeartbeatTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isHeartbeatTopic("heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source1.heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source2.source1.heartbeats"));
+ assertFalse(client.isHeartbeatTopic("heartbeats!"));
+ assertFalse(client.isHeartbeatTopic("!heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1-heartbeats"));
+ }
+
+ @Test
+ public void testIsCheckpointTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isCheckpointTopic("source1.checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints-internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal!"));
+ assertFalse(client.isCheckpointTopic("!checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("source1checkpointsinternal"));
+ }
+
+ @Test
+ public void countHopsForTopicTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+
+ // We can count hops only for heartbeats topics.
+ assertEquals(-1, client.countHopsForTopic("topic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source", "source"));
+ assertEquals(-1, client.countHopsForTopic("sourcetopic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source1.topic", "source2"));
+ assertEquals(-1, client.countHopsForTopic("source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source2"));
+ assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source4"));
+
+ assertEquals(-1, client.countHopsForTopic("heartbeats", "source"));
+ assertEquals(1, client.countHopsForTopic("source1.heartbeats", "source1"));
+ assertEquals(1, client.countHopsForTopic("source2.source1.heartbeats", "source2"));
+ assertEquals(2, client.countHopsForTopic("source2.source1.heartbeats", "source1"));
+ assertEquals(3, client.countHopsForTopic("source3.source2.source1.heartbeats", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.heartbeats", "source4"));
+ }
+
+ @Test
+ public void heartbeatTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source2.source1.heartbeats", "source3.heartbeats"));
+ Set heartbeatTopics = client.heartbeatTopics();
+ assertEquals(heartbeatTopics, new HashSet<>(Arrays.asList("heartbeats", "source1.heartbeats",
+ "source2.source1.heartbeats", "source3.heartbeats")));
+ }
+
+ @Test
+ public void checkpointsTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "checkpoints.internal",
+ "source1.checkpoints.internal", "source2.source1.checkpoints.internal", "source3.checkpoints.internal"));
+ Set checkpointTopics = client.checkpointTopics();
+ assertEquals(new HashSet<>(Arrays.asList("source1.checkpoints.internal",
+ "source2.source1.checkpoints.internal", "source3.checkpoints.internal")), checkpointTopics);
+ }
+
+ @Test
+ public void replicationHopsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.heartbeats", "source4.topic1"));
+ assertEquals(1, client.replicationHops("source1"));
+ assertEquals(2, client.replicationHops("source2"));
+ assertEquals(1, client.replicationHops("source3"));
+ assertEquals(-1, client.replicationHops("source4")); // can't count hops for non-heartbeats topic
+ assertEquals(-1, client.replicationHops("source5"));
+ }
+
+ @Test
+ public void upstreamClustersTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats", "source6.topic1"));
+ Set sources = client.upstreamClusters();
+ assertTrue(sources.contains("source1"));
+ assertTrue(sources.contains("source2"));
+ assertTrue(sources.contains("source3"));
+ assertTrue(sources.contains("source4"));
+ assertTrue(sources.contains("source5"));
+ assertFalse(sources.contains("source6")); // non-heartbeats topic can't indicate upstream cluster
+ assertFalse(sources.contains("sourceX"));
+ assertFalse(sources.contains(""));
+ assertFalse(sources.contains(null));
+ }
+
+ @Test
+ public void remoteTopicsTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1.topic4", "source1.source2.topic5", "source3.source4.source5.topic6",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats"));
+ Set remoteTopics = client.remoteTopics();
+
+ // We recognize as remote only heartbeats topics.
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertFalse(remoteTopics.contains("source1.topic4"));
+ assertFalse(remoteTopics.contains("source1.source2.topic5"));
+ assertFalse(remoteTopics.contains("source3.source4.source5.topic6"));
+
+ assertTrue(remoteTopics.contains("source1.heartbeats"));
+ assertTrue(remoteTopics.contains("source1.source2.heartbeats"));
+ assertTrue(remoteTopics.contains("source3.source4.source5.heartbeats"));
+ }
+
+ @Test
+ public void remoteTopicsSeparatorTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1__topic4", "source1__source2__topic5", "source3__source4__source5__topic6",
+ "source1__heartbeats", "source1__source2__heartbeats", "source3__source4__source5__heartbeats"));
+ ((Configurable) client.replicationPolicy()).configure(
+ Collections.singletonMap("replication.policy.separator", "__"));
+ Set remoteTopics = client.remoteTopics();
+
+ // We recognize as remote only heartbeats topics.
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertFalse(remoteTopics.contains("source1__topic4"));
+ assertFalse(remoteTopics.contains("source1__source2__topic5"));
+ assertFalse(remoteTopics.contains("source3__source4__source5__topic6"));
+
+ assertTrue(remoteTopics.contains("source1__heartbeats"));
+ assertTrue(remoteTopics.contains("source1__source2__heartbeats"));
+ assertTrue(remoteTopics.contains("source3__source4__source5__heartbeats"));
+ }
+
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java
new file mode 100644
index 0000000000000..7776494dcd0ec
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorSourceConnectorDefaultReplicationPolicyTest extends MirrorSourceConnectorTest {
+ @Override
+ protected ReplicationPolicy replicationPolicy() {
+ return new DefaultReplicationPolicy();
+ }
+
+ @Test
+ public void testNoCycles() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ replicationPolicy(), x -> true, x -> true);
+ assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
+ assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
+ assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
+ assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
+ assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
+ }
+
+ @Test
+ public void testAclTransformation() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ replicationPolicy(), x -> true, x -> true);
+ AclBinding allowAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
+ AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
+ String expectedRemoteTopicName = "source" + DefaultReplicationPolicy.SEPARATOR_DEFAULT
+ + allowAllAclBinding.pattern().name();
+ assertEquals(expectedRemoteTopicName, processedAllowAllAclBinding.pattern().name(), "should change topic name");
+ assertEquals(processedAllowAllAclBinding.entry().operation(), AclOperation.READ, "should change ALL to READ");
+ assertEquals(processedAllowAllAclBinding.entry().permissionType(), AclPermissionType.ALLOW, "should not change ALLOW");
+
+ AclBinding denyAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
+ AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
+ assertEquals(processedDenyAllAclBinding.entry().operation(), AclOperation.ALL, "should not change ALL");
+ assertEquals(processedDenyAllAclBinding.entry().permissionType(), AclPermissionType.DENY, "should not change DENY");
+ }
+
+ @Test
+ public void testRefreshTopicPartitions() throws Exception {
+ testRefreshTopicPartitions("source.topic");
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorIdentityReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorIdentityReplicationPolicyTest.java
new file mode 100644
index 0000000000000..f2a2044ee7ebe
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorIdentityReplicationPolicyTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MirrorSourceConnectorIdentityReplicationPolicyTest extends MirrorSourceConnectorTest {
+
+ @Override
+ protected ReplicationPolicy replicationPolicy() {
+ return new IdentityReplicationPolicy();
+ }
+
+ @Test
+ public void testCycles() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ replicationPolicy(), x -> true, x -> true);
+ // LegacyReplicationPolicy can prevent cycles only for heartbeats topics.
+ assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles");
+ assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
+ assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
+ assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
+ assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
+
+ assertFalse(connector.shouldReplicateTopic("target.heartbeats"), "should not allow cycles for heartbeats");
+ assertFalse(connector.shouldReplicateTopic("target.source.heartbeats"), "should not allow cycles for heartbeats");
+ assertFalse(connector.shouldReplicateTopic("source.target.heartbeats"), "should not allow cycles for heartbeats");
+ assertTrue(connector.shouldReplicateTopic("heartbeats"), "should allow anything else for heartbeats");
+ assertTrue(connector.shouldReplicateTopic("source.heartbeats"), "should allow anything else for heartbeats");
+ }
+
+ @Test
+ public void testAclTransformation() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ replicationPolicy(), x -> true, x -> true);
+ AclBinding allowAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
+ AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
+ String expectedRemoteTopicName = allowAllAclBinding.pattern().name();
+ assertTrue(processedAllowAllAclBinding.pattern().name().equals(expectedRemoteTopicName),
+ "should change topic name");
+ assertTrue(processedAllowAllAclBinding.entry().operation() == AclOperation.READ,
+ "should change ALL to READ");
+ assertTrue(processedAllowAllAclBinding.entry().permissionType() == AclPermissionType.ALLOW,
+ "should not change ALLOW");
+
+ AclBinding denyAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
+ AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
+ assertTrue(processedDenyAllAclBinding.entry().operation() == AclOperation.ALL,
+ "should not change ALL");
+ assertTrue(processedDenyAllAclBinding.entry().permissionType() == AclPermissionType.DENY,
+ "should not change DENY");
+ }
+
+ @Test
+ public void testRefreshTopicPartitions() throws Exception {
+ testRefreshTopicPartitions("topic");
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 42d7951cd60fc..e910ebe8bc437 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -52,12 +52,12 @@
import java.util.List;
import java.util.Map;
-public class MirrorSourceConnectorTest {
+abstract class MirrorSourceConnectorTest {
@Test
public void testReplicatesHeartbeatsByDefault() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+ replicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
assertTrue(connector.shouldReplicateTopic("heartbeats"), "should replicate heartbeats");
assertTrue(connector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
}
@@ -70,21 +70,10 @@ public void testReplicatesHeartbeatsDespiteFilter() {
assertTrue(connector.shouldReplicateTopic("us-west.heartbeats"), "should replicate upstream heartbeats");
}
- @Test
- public void testNoCycles() {
- MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
- assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
- assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
- assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
- assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
- assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
- }
-
@Test
public void testAclFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
+ replicationPolicy(), x -> true, x -> true);
assertFalse(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
@@ -92,33 +81,11 @@ public void testAclFiltering() {
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
}
-
- @Test
- public void testAclTransformation() {
- MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
- AclBinding allowAllAclBinding = new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
- AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
- String expectedRemoteTopicName = "source" + DefaultReplicationPolicy.SEPARATOR_DEFAULT
- + allowAllAclBinding.pattern().name();
- assertEquals(expectedRemoteTopicName, processedAllowAllAclBinding.pattern().name(), "should change topic name");
- assertEquals(processedAllowAllAclBinding.entry().operation(), AclOperation.READ, "should change ALL to READ");
- assertEquals(processedAllowAllAclBinding.entry().permissionType(), AclPermissionType.ALLOW, "should not change ALLOW");
-
- AclBinding denyAllAclBinding = new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
- AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
- assertEquals(processedDenyAllAclBinding.entry().operation(), AclOperation.ALL, "should not change ALL");
- assertEquals(processedDenyAllAclBinding.entry().permissionType(), AclPermissionType.DENY, "should not change DENY");
- }
@Test
public void testConfigPropertyFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
+ replicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
ArrayList entries = new ArrayList<>();
entries.add(new ConfigEntry("name-1", "value-1"));
entries.add(new ConfigEntry("min.insync.replicas", "2"));
@@ -176,10 +143,9 @@ public void testMirrorSourceConnectorTaskConfig() {
assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS));
}
- @Test
- public void testRefreshTopicPartitions() throws Exception {
+ public void testRefreshTopicPartitions(String newTopic) throws Exception {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+ replicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
@@ -199,18 +165,18 @@ public void testRefreshTopicPartitions() throws Exception {
connector.refreshTopicPartitions();
Map expectedPartitionCounts = new HashMap<>();
- expectedPartitionCounts.put("source.topic", 1L);
+ expectedPartitionCounts.put(newTopic, 1L);
Map configMap = MirrorSourceConnector.configToMap(topicConfig);
assertEquals(2, configMap.size());
Map expectedNewTopics = new HashMap<>();
- expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configMap));
+ expectedNewTopics.put(newTopic, new NewTopic(newTopic, 1, (short) 0).configs(configMap));
verify(connector, times(2)).computeAndCreateTopicPartitions();
verify(connector, times(2)).createNewTopics(eq(expectedNewTopics));
verify(connector, times(0)).createNewPartitions(any());
- List targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
+ List targetTopicPartitions = Collections.singletonList(new TopicPartition(newTopic, 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
connector.refreshTopicPartitions();
@@ -251,4 +217,6 @@ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
connector.refreshTopicPartitions();
verify(connector, times(1)).computeAndCreateTopicPartitions();
}
+
+ protected abstract ReplicationPolicy replicationPolicy();
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIdentityReplicationPolicyIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIdentityReplicationPolicyIntegrationTest.java
new file mode 100644
index 0000000000000..43abad3f53022
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIdentityReplicationPolicyIntegrationTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
+import org.apache.kafka.connect.mirror.MirrorClient;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Tag;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}.
+ *
+ * MM2 is configured with active/passive replication between two Kafka clusters with {@link IdentityReplicationPolicy}.
+ * Tests validate that records sent to the primary cluster arrive at the backup cluster. Then, a consumer group is
+ * migrated from the primary cluster to the backup cluster. Tests validate that consumer offsets
+ * are translated and replicated from the primary cluster to the backup cluster during this failover.
+ */
+@Tag("integration")
+public class MirrorConnectorsIdentityReplicationPolicyIntegrationTest extends MirrorConnectorsIntegrationTest {
+ @BeforeEach
+ public void startClusters() throws Exception {
+ super.startClusters(new HashMap() {{
+ put("replication.policy.class", IdentityReplicationPolicy.class.getName());
+ put("topics", "test-topic-.*");
+ put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
+ put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
+ }});
+ }
+
+ @Test
+ public void testReplication() throws Exception {
+ produceMessages(primary, "test-topic-1");
+ String consumerGroupName = "consumer-group-testReplication";
+ Map consumerProps = new HashMap() {{
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "latest");
+ }};
+ // warm up consumers before starting the connectors so we don't need to wait for discovery
+ warmUpConsumer(consumerProps);
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
+
+ MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
+ MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+
+ // make sure the topic is auto-created in the other cluster
+ waitForTopicCreated(primary, "test-topic-1");
+ waitForTopicCreated(backup, "test-topic-1");
+ assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
+ "topic config was not synced");
+
+ assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
+ "Records were not produced to primary cluster.");
+ assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
+ "Records were not replicated to backup cluster.");
+
+ assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
+ "Heartbeats were not emitted to primary cluster.");
+ assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
+ "Heartbeats were not emitted to backup cluster.");
+ assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
+ "Heartbeats were not replicated downstream to backup cluster.");
+ assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
+ "Heartbeats were not replicated downstream to primary cluster.");
+
+ assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
+ assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
+ assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
+ "Checkpoints were not emitted downstream to backup cluster.");
+
+ Map backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
+ Duration.ofMillis(CHECKPOINT_DURATION_MS));
+
+ assertTrue(backupOffsets.containsKey(
+ new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+
+ // Failover consumer group to backup cluster.
+ try (Consumer primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
+ primaryConsumer.assign(backupOffsets.keySet());
+ backupOffsets.forEach(primaryConsumer::seek);
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitAsync();
+
+ assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
+ assertTrue(primaryConsumer.position(
+ new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
+ }
+
+ primaryClient.close();
+ backupClient.close();
+
+ // create more matching topics
+ primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+
+ // make sure the topic is auto-created in the other cluster
+ waitForTopicCreated(backup, "test-topic-2");
+
+ // only produce messages to the first partition
+ produceMessages(primary, "test-topic-2", 1);
+
+ // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
+ assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
+ "Records were not produced to primary cluster.");
+ assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
+ "New topic was not replicated to backup cluster.");
+ }
+
+ @Test
+ public void testReplicationWithEmptyPartition() throws Exception {
+ String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
+ Map consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
+ // create topic
+ String topic = "test-topic-with-empty-partition";
+ primary.kafka().createTopic(topic, NUM_PARTITIONS);
+
+ // produce to all test-topic-empty's partitions, except the last partition
+ produceMessages(primary, topic, NUM_PARTITIONS - 1);
+
+ // consume before starting the connectors so we don't need to wait for discovery
+ int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
+ waitForConsumingAllRecords(primaryConsumer, expectedRecords);
+ }
+
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
+ Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+
+ // note that with LegacyReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic
+ String backupTopic = topic;
+
+ // consume all records from backup cluster
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
+ backupTopic)) {
+ waitForConsumingAllRecords(backupConsumer, expectedRecords);
+ }
+
+ try (Admin backupClient = backup.kafka().createAdminClient()) {
+ // retrieve the consumer group offset from backup cluster
+ Map remoteOffsets =
+ backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
+
+ // pinpoint the offset of the last partition which does not receive records
+ OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
+ // offset of the last partition should exist, but its value should be 0
+ assertNotNull(offset, "Offset of last partition was not replicated");
+ assertEquals(0, offset.offset(), "Offset of last partition is not zero");
+ }
+ }
+
+ @Test
+ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
+ produceMessages(primary, "test-topic-1");
+ String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
+ Map consumerProps = new HashMap() {{
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "earliest");
+ }};
+ // create consumers before starting the connectors so we don't need to wait for discovery
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
+ "test-topic-1")) {
+ // we need to wait for consuming all the records for MM2 replicating the expected offsets
+ waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+ }
+
+ // enable automated consumer group offset sync
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // make sure the topic is created in the other cluster
+ waitForTopicCreated(primary, "backup.test-topic-1");
+ waitForTopicCreated(backup, "test-topic-1");
+ // create a consumer at backup cluster with same consumer group Id to consume 1 topic
+ Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "test-topic-1");
+
+ waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
+
+ ConsumerRecords records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+
+ // the size of consumer record should be zero, because the offsets of the same consumer group
+ // have been automatically synchronized from primary to backup by the background job, so no
+ // more records to consume from the replicated topic by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+
+ // now create a new topic in primary cluster
+ primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+ // make sure the topic is created in backup cluster
+ waitForTopicCreated(backup, "test-topic-2");
+
+ // produce some records to the new topic in primary cluster
+ produceMessages(primary, "test-topic-2");
+
+ // create a consumer at primary cluster to consume the new topic
+ try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", "consumer-group-1"), "test-topic-2")) {
+ // we need to wait for consuming all the records for MM2 replicating the expected offsets
+ waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
+ }
+
+ // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+ backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", consumerGroupName), "test-topic-1", "test-topic-2");
+
+ waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
+
+ records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+ backupConsumer.close();
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 841eedc33c6e6..c7d308aa3381d 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -77,29 +77,29 @@
public abstract class MirrorConnectorsIntegrationBaseTest {
private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
- private static final int NUM_RECORDS_PER_PARTITION = 10;
- private static final int NUM_PARTITIONS = 10;
- private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
- private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
- private static final int CHECKPOINT_DURATION_MS = 20_000;
+ protected static final int NUM_RECORDS_PER_PARTITION = 10;
+ protected static final int NUM_PARTITIONS = 10;
+ protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
+ protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+ protected static final int CHECKPOINT_DURATION_MS = 20_000;
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
private static final int OFFSET_SYNC_DURATION_MS = 30_000;
private static final int TOPIC_SYNC_DURATION_MS = 60_000;
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
private static final int NUM_WORKERS = 3;
- private static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500);
+ protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
protected static final String BACKUP_CLUSTER_ALIAS = "backup";
- private static final List> CONNECTOR_LIST =
+ protected static final List> CONNECTOR_LIST =
Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
private volatile boolean shuttingDown;
protected Map mm2Props = new HashMap<>();
- private MirrorMakerConfig mm2Config;
- private EmbeddedConnectCluster primary;
- private EmbeddedConnectCluster backup;
+ protected MirrorMakerConfig mm2Config;
+ protected EmbeddedConnectCluster primary;
+ protected EmbeddedConnectCluster backup;
- private Exit.Procedure exitProcedure;
+ protected Exit.Procedure exitProcedure;
private Exit.Procedure haltProcedure;
protected Properties primaryBrokerProps = new Properties();
@@ -109,6 +109,14 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
@BeforeEach
public void startClusters() throws Exception {
+ startClusters(new HashMap() {{
+ put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
+ put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
+ put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
+ }});
+ }
+
+ public void startClusters(Map additionalMM2Config) throws Exception {
shuttingDown = false;
exitProcedure = (code, message) -> {
if (shuttingDown) {
@@ -139,10 +147,11 @@ public void startClusters() throws Exception {
primaryBrokerProps.put("auto.create.topics.enable", "false");
backupBrokerProps.put("auto.create.topics.enable", "false");
-
+
mm2Props.putAll(basicMM2Config());
+ mm2Props.putAll(additionalMM2Config);
- mm2Config = new MirrorMakerConfig(mm2Props);
+ mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
@@ -171,7 +180,7 @@ public void startClusters() throws Exception {
waitForTopicCreated(primary, "mm2-status.backup.internal");
waitForTopicCreated(primary, "mm2-offsets.backup.internal");
waitForTopicCreated(primary, "mm2-configs.backup.internal");
-
+
backup.start();
backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time.");
@@ -373,9 +382,11 @@ public void testReplicationWithEmptyPartition() throws Exception {
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
+ String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic;
+
// consume all records from backup cluster
- try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
- PRIMARY_CLUSTER_ALIAS + "." + topic)) {
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
+ backupTopic)) {
waitForConsumingAllRecords(backupConsumer, expectedRecords);
}
@@ -385,7 +396,7 @@ public void testReplicationWithEmptyPartition() throws Exception {
backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
// pinpoint the offset of the last partition which does not receive records
- OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(PRIMARY_CLUSTER_ALIAS + "." + topic, NUM_PARTITIONS - 1));
+ OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
// offset of the last partition should exist, but its value should be 0
assertNotNull(offset, "Offset of last partition was not replicated");
assertEquals(0, offset.offset(), "Offset of last partition is not zero");
@@ -465,7 +476,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
/*
* launch the connectors on kafka connect cluster and check if they are running
*/
- private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
+ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
List> connectorClasses, MirrorMakerConfig mm2Config,
String primary, String backup) throws InterruptedException {
for (Class extends Connector> connector : connectorClasses) {
@@ -485,7 +496,7 @@ private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect
/*
* wait for the topic created on the cluster
*/
- private static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
+ protected static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
try (final Admin adminClient = cluster.kafka().createAdminClient()) {
waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), TOPIC_SYNC_DURATION_MS,
"Topic: " + topicName + " didn't get created on cluster: " + cluster.getName()
@@ -507,7 +518,7 @@ private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Excepti
/*
* retrieve the config value based on the input cluster, topic and config name
*/
- private static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception {
+ protected static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception {
try (Admin client = cluster.createAdminClient()) {
Collection cr = Collections.singleton(
new ConfigResource(ConfigResource.Type.TOPIC, topic));
@@ -542,7 +553,7 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
* given consumer group, topics and expected number of records, make sure the consumer group
* offsets are eventually synced to the expected offset numbers
*/
- private static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
+ protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
Consumer consumer, List topics, String consumerGroupId, int numRecords)
throws InterruptedException {
try (Admin adminClient = connect.kafka().createAdminClient()) {
@@ -572,7 +583,7 @@ private static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster co
/*
* make sure the consumer to consume expected number of records
*/
- private static void waitForConsumingAllRecords(Consumer consumer, int numExpectedRecords)
+ protected static void waitForConsumingAllRecords(Consumer consumer, int numExpectedRecords)
throws InterruptedException {
final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
waitForCondition(() -> {
@@ -585,14 +596,11 @@ private static void waitForConsumingAllRecords(Consumer consumer, int
/*
* MM2 config to use in integration tests
*/
- private static Map basicMM2Config() {
+ protected static Map basicMM2Config() {
Map mm2Props = new HashMap<>();
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
mm2Props.put("max.tasks", "10");
- mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
mm2Props.put("groups", "consumer-group-.*");
- mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
- mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
mm2Props.put("sync.topic.acls.enabled", "false");
mm2Props.put("emit.checkpoints.interval.seconds", "1");
mm2Props.put("emit.heartbeats.interval.seconds", "1");
@@ -630,7 +638,7 @@ private void createTopics() {
/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
- private void warmUpConsumer(Map consumerProps) throws InterruptedException {
+ protected void warmUpConsumer(Map consumerProps) throws InterruptedException {
Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();