Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class WorkerSinkTask implements WorkerTask {
private WorkerSinkTaskThread workThread;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;

public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter keyConverter, Converter valueConverter, Time time) {
Expand All @@ -75,11 +76,17 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf

@Override
public void start(Properties props) {
consumer = createConsumer(props);
consumer = createConsumer();
context = new WorkerSinkTaskContext(consumer);

// Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
// to work with. Any rewinding will be handled immediately when polling starts.
String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
log.debug("Task {} subscribing to topics {}", id, topics);
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
consumer.poll(0);

task.initialize(context);
Expand All @@ -92,25 +99,25 @@ public void start(Properties props) {
@Override
public void stop() {
// Offset commit is handled upon exit in work thread
task.stop();
if (workThread != null)
workThread.startGracefulShutdown();
consumer.wakeup();
}

@Override
public boolean awaitStop(long timeoutMs) {
boolean success = true;
if (workThread != null) {
try {
boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success)
workThread.forceShutdown();
return success;
} catch (InterruptedException e) {
return false;
success = false;
}
}
return true;
task.stop();
return success;
}

@Override
Expand Down Expand Up @@ -143,34 +150,40 @@ public void poll(long timeoutMs) {
* Starts an offset commit by flushing outstanding messages from the task and then starting
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
**/
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
public void commitOffsets(boolean sync, final int seqno) {
log.info("{} Committing offsets", this);
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
offsets.put(tp, new OffsetAndMetadata(consumer.position(tp)));
long pos = consumer.position(tp);
offsets.put(tp, new OffsetAndMetadata(pos));
log.trace("{} committing {} offset {}", id, tp, pos);
}
// We only don't flush the task in one case: when shutting down, the task has already been
// stopped and all data should have already been flushed
if (flush) {
try {
task.flush(offsets);
} catch (Throwable t) {
log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
workThread.onCommitCompleted(t, seqno);
return;

try {
task.flush(offsets);
} catch (Throwable t) {
log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
log.error("Rewinding offsets to last committed offsets");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
workThread.onCommitCompleted(t, seqno);
return;
}

if (sync) {
try {
consumer.commitSync(offsets);
lastCommittedOffsets = offsets;
} catch (KafkaException e) {
workThread.onCommitCompleted(e, seqno);
}
} else {
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
lastCommittedOffsets = offsets;
workThread.onCommitCompleted(error, seqno);
}
};
Expand All @@ -186,16 +199,11 @@ public WorkerConfig workerConfig() {
return workerConfig;
}

private KafkaConsumer<byte[], byte[]> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");

private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Properties props = workerConfig.unusedProperties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.toString());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand All @@ -210,9 +218,6 @@ private KafkaConsumer<byte[], byte[]> createConsumer(Properties taskProps) {
throw new CopycatException("Failed to create consumer", t);
}

log.debug("Task {} subscribing to topics {}", id, topics);
newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance());

return newConsumer;
}

Expand Down Expand Up @@ -264,12 +269,23 @@ private void rewind() {
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
task.onPartitionsAssigned(partitions);
lastCommittedOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
log.trace("{} assigned topic partition {} with offset {}", id, tp, pos);
}
// Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
// task start. Since this callback gets invoked during that initial setup before we've started the task, we
// need to guard against invoking the user's callback method during that period.
if (workThread != null)
task.onPartitionsAssigned(partitions);
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
task.onPartitionsRevoked(partitions);
commitOffsets(true, -1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void execute() {
iteration();
}
// Make sure any uncommitted data has committed
task.commitOffsets(task.time().milliseconds(), true, -1, false);
task.commitOffsets(true, -1);
}

public void iteration() {
Expand All @@ -67,7 +67,7 @@ public void iteration() {
commitSeqno += 1;
commitStarted = now;
}
task.commitOffsets(now, false, commitSeqno, true);
task.commitOffsets(false, commitSeqno);
nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
}

Expand Down
Loading