Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -27,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
Expand All @@ -41,7 +47,8 @@ public class MirrorCheckpointTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);

private AdminClient sourceAdminClient;
private Admin sourceAdminClient;
private Admin targetAdminClient;
private String sourceClusterAlias;
private String targetClusterAlias;
private String checkpointsTopic;
Expand All @@ -53,16 +60,22 @@ public class MirrorCheckpointTask extends SourceTask {
private OffsetSyncStore offsetSyncStore;
private boolean stopping;
private MirrorMetrics metrics;

private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
public MirrorCheckpointTask() {}

// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore) {
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
}

@Override
Expand All @@ -79,7 +92,15 @@ public void start(Map<String, String> props) {
pollTimeout = config.consumerPollTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
targetAdminClient = AdminClient.create(config.targetAdminConfig());
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
Comment thread
mimaison marked this conversation as resolved.
Outdated
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}

@Override
Expand All @@ -93,7 +114,9 @@ public void stop() {
stopping = true;
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Utils.closeQuietly(metrics, "metrics");
Utils.closeQuietly(scheduler, "scheduler");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also close targetAdminClient

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
}

Expand All @@ -111,7 +134,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
records.addAll(checkpointsForGroup(group));
records.addAll(sourceRecordsForGroup(group));
}
if (records.isEmpty()) {
// WorkerSourceTask expects non-zero batches or null
Expand All @@ -125,13 +148,13 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

private List<SourceRecord> checkpointsForGroup(String group) throws InterruptedException {

private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately
List<Checkpoint> checkpoints = checkpointsForGroup(group);
checkpointsPerConsumerGroup.put(group, checkpoints);
return checkpoints.stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
} catch (ExecutionException e) {
Expand All @@ -140,6 +163,14 @@ private List<SourceRecord> checkpointsForGroup(String group) throws InterruptedE
}
}

private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
return listConsumerGroupOffsets(group).entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic()))
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately
.collect(Collectors.toList());
}

private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
throws InterruptedException, ExecutionException {
if (stopping) {
Expand Down Expand Up @@ -188,4 +219,101 @@ public void commitRecord(SourceRecord record) {
Checkpoint.unwrapGroup(record.sourcePartition()),
System.currentTimeMillis() - record.timestamp());
}

private void refreshIdleConsumerGroupOffset() {
Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient
.describeConsumerGroups(consumerGroups).describedGroups();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we describe all groups just to get groups in the EMPTY state. Can we use the new listGroups() method introduced in KIP-518 to only get groups in that specific state?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second though, using describeConsumerGroups() may be more predictable in terms on work to do, as you describe only the groups assgined to this task

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great to know that KIP, then I will keep using describeConsumerGroups() here


for (String group : consumerGroups) {
try {
ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get();
ConsumerGroupState consumerGroupState = consumerGroupDesc.state();
// sync offset to the target cluster only if the state of current consumer group is:
// (1) idle: because the consumer at target is not actively consuming the mirrored topic
// (2) dead: the new consumer that is recently created at source and never exist at target
if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group)
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
}
// new consumer upstream has state "DEAD" and will be identified during the offset sync-up
} catch (InterruptedException | ExecutionException e) {
log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e);
}
}
}

Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();

// first, sync offsets for the idle consumers at target
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
String consumerGroupId = group.getKey();
// for each idle consumer at target, read the checkpoints (converted upstream offset)
// from the pre-populated map
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = group.getValue();

Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> targetConsumerOffset = idleConsumerGroupsOffset.get(consumerGroupId);
if (targetConsumerOffset == null) {
// this is a new consumer, just sync the offset to target
syncGroupOffset(consumerGroupId, convertedUpstreamOffset);
offsetToSyncAll.put(consumerGroupId, convertedUpstreamOffset);
continue;
}

for (Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) {

TopicPartition topicPartition = convertedEntry.getKey();
OffsetAndMetadata convertedOffset = convertedUpstreamOffset.get(topicPartition);
if (!targetConsumerOffset.containsKey(topicPartition)) {
// if is a new topicPartition from upstream, just sync the offset to target
offsetToSync.put(topicPartition, convertedOffset);
continue;
}

// if translated offset from upstream is smaller than the current consumer offset
// in the target, skip updating the offset for that partition
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset();
if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue;
}
offsetToSync.put(topicPartition, convertedOffset);
}

if (offsetToSync.size() == 0) {
log.trace("skip syncing the offset for consumer group: {}", consumerGroupId);
continue;
}
syncGroupOffset(consumerGroupId, offsetToSync);

offsetToSyncAll.put(consumerGroupId, offsetToSync);
}
idleConsumerGroupsOffset.clear();
return offsetToSyncAll;
}

void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetadata> offsetToSync) {
if (targetAdminClient != null) {
targetAdminClient.alterConsumerGroupOffsets(consumerGroupId, offsetToSync);
log.trace("sync-ed the offset for consumer group: {} with {} number of offset entries",
consumerGroupId, offsetToSync.size());
}
}

Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;

import java.util.Map;
import java.util.HashMap;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls";
protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
protected static final String SYNC_GROUP_OFFSETS = "sync.group.offsets";

public static final String ENABLED = "enabled";
private static final String ENABLED_DOC = "Whether to replicate source->target.";
Expand Down Expand Up @@ -169,6 +172,14 @@ public class MirrorConnectorConfig extends AbstractConfig {
private static final String EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC = "Frequency of checkpoints.";
public static final long EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT = 60;


public static final String SYNC_GROUP_OFFSETS_ENABLED = SYNC_GROUP_OFFSETS + ENABLED_SUFFIX;
private static final String SYNC_GROUP_OFFSETS_ENABLED_DOC = "Whether to periodically write the translated offsets to __consumer_offsets topic in target cluster, as long as no active consumers in that group are connected to the target cluster";
public static final boolean SYNC_GROUP_OFFSETS_ENABLED_DEFAULT = false;
public static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS = SYNC_GROUP_OFFSETS + INTERVAL_SECONDS_SUFFIX;
private static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC = "Frequency of consumer group offset sync.";
public static final long SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT = 60;

public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
Expand Down Expand Up @@ -229,8 +240,8 @@ Map<String, Object> sourceConsumerConfig() {
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

Expand Down Expand Up @@ -394,6 +405,15 @@ ConfigPropertyFilter configPropertyFilter() {
return getConfiguredInstance(CONFIG_PROPERTY_FILTER_CLASS, ConfigPropertyFilter.class);
}

Duration syncGroupOffsetsInterval() {
if (getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
return Duration.ofSeconds(getLong(SYNC_GROUP_OFFSETS_INTERVAL_SECONDS));
} else {
// negative interval to disable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return Duration.ofMillis(-1);
}
}

protected static final ConfigDef CONNECTOR_CONFIG_DEF = ConnectorConfig.configDef()
.define(
ENABLED,
Expand Down Expand Up @@ -544,6 +564,18 @@ ConfigPropertyFilter configPropertyFilter() {
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC)
.define(
SYNC_GROUP_OFFSETS_ENABLED,
ConfigDef.Type.BOOLEAN,
SYNC_GROUP_OFFSETS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_ENABLED_DOC)
.define(
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS,
ConfigDef.Type.LONG,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC)
.define(
REPLICATION_POLICY_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -29,7 +34,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testDownstreamTopicRenaming() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null);
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
assertEquals(new TopicPartition("topic3", 5),
Expand All @@ -42,7 +47,7 @@ public void testDownstreamTopicRenaming() {
public void testCheckpoint() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore);
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
Expand All @@ -64,4 +69,56 @@ public void testCheckpoint() {
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
}

@Test
public void testSyncOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();

String consumer1 = "consumer1";
String consumer2 = "consumer2";

String topic1 = "topic1";
String topic2 = "topic2";

// 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1
Map<TopicPartition, OffsetAndMetadata> c1t1 = new HashMap<>();
// 't1p0' denotes topic1, partition 0
TopicPartition t1p0 = new TopicPartition(topic1, 0);

c1t1.put(t1p0, new OffsetAndMetadata(100));

Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>();
TopicPartition t2p0 = new TopicPartition(topic2, 0);

c2t2.put(t2p0, new OffsetAndMetadata(50));

idleConsumerGroupsOffset.put(consumer1, c1t1);
idleConsumerGroupsOffset.put(consumer2, c2t2);

// 'cpC1T1P0' denotes 'checkpoint' of topic1, partition 0 for consumer1
Checkpoint cpC1T1P0 = new Checkpoint(consumer1, new TopicPartition(topic1, 0), 200, 101, "metadata");

// 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");

// 'checkpointListC1' denotes 'checkpoint' list for consumer1
List<Checkpoint> checkpointListC1 = new ArrayList<>();
checkpointListC1.add(cpC1T1P0);

// 'checkpointListC2' denotes 'checkpoint' list for consumer2
List<Checkpoint> checkpointListC2 = new ArrayList<>();
checkpointListC2.add(cpC2T2P0);

checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);

MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);

Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();

assertEquals(101, output.get(consumer1).get(t1p0).offset());
assertEquals(51, output.get(consumer2).get(t2p0).offset());
}
}
Loading