Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.Properties;

/**
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
Expand All @@ -46,6 +47,13 @@ public void initialize(SinkTaskContext context) {
this.context = context;
}

/**
* Start the Task. This should handle any configuration parsing and one-time setup of the task.
* @param props initial configuration
*/
@Override
public abstract void start(Properties props);

/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
Expand Down Expand Up @@ -84,4 +92,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
*/
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}

/**
* Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other
* methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset
* commit has completed. Implementations of this method should only need to perform final cleanup operations, such
* as closing network connections to the sink system.
*/
public abstract void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.copycat.connector.Task;

import java.util.List;
import java.util.Properties;

/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
Expand All @@ -37,6 +38,13 @@ public void initialize(SourceTaskContext context) {
this.context = context;
}

/**
* Start the Task. This should handle any configuration parsing and one-time setup of the task.
* @param props initial configuration
*/
@Override
public abstract void start(Properties props);

/**
* Poll this SourceTask for new records. This method should block if no data is currently
* available.
Expand All @@ -59,4 +67,16 @@ public void initialize(SourceTaskContext context) {
public void commit() throws InterruptedException {
// This space intentionally left blank.
}

/**
* Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
* trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
* fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
* {@link #commit()}.
*
* For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
* could set a flag that will force {@link #poll()} to exit immediately and invoke
* {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests.
*/
public abstract void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ class WorkerSinkTask implements WorkerTask {
private final Converter keyConverter;
private final Converter valueConverter;
private WorkerSinkTaskThread workThread;
private Properties taskProps;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private boolean started;
private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;

public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Expand All @@ -72,26 +74,15 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.time = time;
this.started = false;
}

@Override
public void start(Properties props) {
taskProps = props;
consumer = createConsumer();
context = new WorkerSinkTaskContext(consumer);

// Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
// to work with. Any rewinding will be handled immediately when polling starts.
String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
log.debug("Task {} subscribing to topics {}", id, topics);
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
consumer.poll(0);

task.initialize(context);
task.start(props);

workThread = createWorkerThread();
workThread.start();
}
Expand Down Expand Up @@ -128,6 +119,35 @@ public void close() {
consumer.close();
}

/**
* Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the
* SinkTask.
*
* @returns true if successful, false if joining the consumer group was interrupted
*/
public boolean joinConsumerGroupAndStart() {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
log.debug("Task {} subscribing to topics {}", id, topics);
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());

// Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
// to work with. Any rewinding will be handled immediately when polling starts.
try {
consumer.poll(0);
} catch (WakeupException e) {
log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this);
return false;
}
task.initialize(context);
task.start(taskProps);
log.info("Sink task {} finished initialization and start", this);
started = true;
return true;
}

/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
public void poll(long timeoutMs) {
try {
Expand Down Expand Up @@ -156,7 +176,7 @@ public void commitOffsets(boolean sync, final int seqno) {
for (TopicPartition tp : consumer.assignment()) {
long pos = consumer.position(tp);
offsets.put(tp, new OffsetAndMetadata(pos));
log.trace("{} committing {} offset {}", id, tp, pos);
log.debug("{} committing {} offset {}", id, tp, pos);
}

try {
Expand Down Expand Up @@ -273,12 +293,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
log.trace("{} assigned topic partition {} with offset {}", id, tp, pos);
log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
}
// Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
// task start. Since this callback gets invoked during that initial setup before we've started the task, we
// need to guard against invoking the user's callback method during that period.
if (workThread != null)
if (started)
task.onPartitionsAssigned(partitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,

@Override
public void execute() {
// Try to join and start. If we're interrupted before this completes, bail.
if (!task.joinConsumerGroupAndStart())
return;

while (getRunning()) {
iteration();
}

// Make sure any uncommitted data has committed
task.commitOffsets(true, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
class WorkerSourceTask implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);

private ConnectorTaskId id;
private SourceTask task;
private final ConnectorTaskId id;
private final SourceTask task;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<byte[], byte[]> producer;
private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter;
private final OffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final WorkerConfig workerConfig;
private final Time time;

Expand Down Expand Up @@ -86,15 +86,12 @@ public WorkerSourceTask(ConnectorTaskId id, SourceTask task,

@Override
public void start(Properties props) {
task.initialize(new WorkerSourceTaskContext(offsetReader));
task.start(props);
workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props);
workThread.start();
}

@Override
public void stop() {
task.stop();
if (workThread != null)
workThread.startGracefulShutdown();
}
Expand All @@ -111,7 +108,6 @@ public boolean awaitStop(long timeoutMs) {
success = false;
}
}
commitOffsets();
return success;
}

Expand Down Expand Up @@ -277,13 +273,31 @@ private void finishSuccessfulFlush() {


private class WorkerSourceTaskThread extends ShutdownableThread {
public WorkerSourceTaskThread(String name) {
private Properties workerProps;
private boolean finishedStart;
private boolean startedShutdownBeforeStartCompleted;

public WorkerSourceTaskThread(String name, Properties workerProps) {
super(name);
this.workerProps = workerProps;
this.finishedStart = false;
this.startedShutdownBeforeStartCompleted = false;
}

@Override
public void execute() {
try {
task.initialize(new WorkerSourceTaskContext(offsetReader));
task.start(workerProps);
log.info("Source task {} finished initialization and start", this);
synchronized (this) {
if (startedShutdownBeforeStartCompleted) {
task.stop();
return;
}
finishedStart = true;
}

while (getRunning()) {
List<SourceRecord> records = task.poll();
if (records == null)
Expand All @@ -293,6 +307,19 @@ public void execute() {
} catch (InterruptedException e) {
// Ignore and allow to exit.
}

commitOffsets();
}

@Override
public void startGracefulShutdown() {
super.startGracefulShutdown();
synchronized (this) {
if (finishedStart)
task.stop();
else
startedShutdownBeforeStartCompleted = true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void testPollsInBackground() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
for (int i = 0; i < 10; i++) {
workerThread.iteration();
}
Expand Down Expand Up @@ -202,6 +203,7 @@ public void testCommit() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
// First iteration gets one record
workerThread.iteration();
// Second triggers commit, gets a second offset
Expand Down Expand Up @@ -236,6 +238,7 @@ public void testCommitTaskFlushFailure() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
// Second iteration triggers commit
workerThread.iteration();
workerThread.iteration();
Expand Down Expand Up @@ -267,6 +270,7 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
// Second iteration triggers first commit, third iteration triggers second (failing) commit
workerThread.iteration();
workerThread.iteration();
Expand All @@ -292,6 +296,7 @@ public void testCommitConsumerFailure() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
// Second iteration triggers commit
workerThread.iteration();
workerThread.iteration();
Expand All @@ -318,6 +323,7 @@ public void testCommitTimeout() throws Exception {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
// Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
// trigger another commit
workerThread.iteration();
Expand Down Expand Up @@ -393,6 +399,7 @@ public Object answer() throws Throwable {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
workerThread.iteration();
workerThread.iteration();
workerThread.iteration();
Expand Down Expand Up @@ -436,6 +443,7 @@ public Object answer() throws Throwable {
PowerMock.replayAll();

workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
workerThread.iteration();
workerThread.iteration();
workerTask.stop();
Expand All @@ -448,7 +456,17 @@ public Object answer() throws Throwable {
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);

workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
workerTask, "mock-worker-thread", time,
workerConfig);
PowerMock.expectPrivate(workerTask, "createWorkerThread")
.andReturn(workerThread);
workerThread.start();
PowerMock.expectLastCall();

consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();

EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
Expand All @@ -464,14 +482,6 @@ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
PowerMock.expectLastCall();
sinkTask.start(TASK_PROPS);
PowerMock.expectLastCall();

workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
workerTask, "mock-worker-thread", time,
workerConfig);
PowerMock.expectPrivate(workerTask, "createWorkerThread")
.andReturn(workerThread);
workerThread.start();
PowerMock.expectLastCall();
}

private void expectStopTask(final long expectedMessages) throws Exception {
Expand Down
Loading