diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index c6cd12f67eb5a..7c03cda9f3715 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; +import java.util.Properties; /** * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In @@ -46,6 +47,13 @@ public void initialize(SinkTaskContext context) { this.context = context; } + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Properties props); + /** * Put the records in the sink. Usually this should send the records to the sink asynchronously * and immediately return. @@ -84,4 +92,12 @@ public void onPartitionsAssigned(Collection partitions) { */ public void onPartitionsRevoked(Collection partitions) { } + + /** + * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other + * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset + * commit has completed. Implementations of this method should only need to perform final cleanup operations, such + * as closing network connections to the sink system. + */ + public abstract void stop(); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java index 1e1da345979b2..30cbf16f155c8 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.copycat.connector.Task; import java.util.List; +import java.util.Properties; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. @@ -37,6 +38,13 @@ public void initialize(SourceTaskContext context) { this.context = context; } + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Properties props); + /** * Poll this SourceTask for new records. This method should block if no data is currently * available. @@ -59,4 +67,16 @@ public void initialize(SourceTaskContext context) { public void commit() throws InterruptedException { // This space intentionally left blank. } + + /** + * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop + * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has + * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and + * {@link #commit()}. + * + * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method + * could set a flag that will force {@link #poll()} to exit immediately and invoke + * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests. + */ + public abstract void stop(); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 55a67c0079826..dc5173049c7b2 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -60,8 +60,10 @@ class WorkerSinkTask implements WorkerTask { private final Converter keyConverter; private final Converter valueConverter; private WorkerSinkTaskThread workThread; + private Properties taskProps; private KafkaConsumer consumer; private WorkerSinkTaskContext context; + private boolean started; private Map lastCommittedOffsets; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, @@ -72,26 +74,15 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; + this.started = false; } @Override public void start(Properties props) { + taskProps = props; consumer = createConsumer(); context = new WorkerSinkTaskContext(consumer); - // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions - // to work with. Any rewinding will be handled immediately when polling starts. - String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG); - if (topicsStr == null || topicsStr.isEmpty()) - throw new CopycatException("Sink tasks require a list of topics."); - String[] topics = topicsStr.split(","); - log.debug("Task {} subscribing to topics {}", id, topics); - consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - consumer.poll(0); - - task.initialize(context); - task.start(props); - workThread = createWorkerThread(); workThread.start(); } @@ -128,6 +119,35 @@ public void close() { consumer.close(); } + /** + * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the + * SinkTask. + * + * @returns true if successful, false if joining the consumer group was interrupted + */ + public boolean joinConsumerGroupAndStart() { + String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); + if (topicsStr == null || topicsStr.isEmpty()) + throw new CopycatException("Sink tasks require a list of topics."); + String[] topics = topicsStr.split(","); + log.debug("Task {} subscribing to topics {}", id, topics); + consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); + + // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions + // to work with. Any rewinding will be handled immediately when polling starts. + try { + consumer.poll(0); + } catch (WakeupException e) { + log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); + return false; + } + task.initialize(context); + task.start(taskProps); + log.info("Sink task {} finished initialization and start", this); + started = true; + return true; + } + /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ public void poll(long timeoutMs) { try { @@ -156,7 +176,7 @@ public void commitOffsets(boolean sync, final int seqno) { for (TopicPartition tp : consumer.assignment()) { long pos = consumer.position(tp); offsets.put(tp, new OffsetAndMetadata(pos)); - log.trace("{} committing {} offset {}", id, tp, pos); + log.debug("{} committing {} offset {}", id, tp, pos); } try { @@ -273,12 +293,12 @@ public void onPartitionsAssigned(Collection partitions) { for (TopicPartition tp : partitions) { long pos = consumer.position(tp); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); - log.trace("{} assigned topic partition {} with offset {}", id, tp, pos); + log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); } // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (workThread != null) + if (started) task.onPartitionsAssigned(partitions); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java index 486407d3c37a9..ab3f1feb34524 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java @@ -50,9 +50,14 @@ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, @Override public void execute() { + // Try to join and start. If we're interrupted before this completes, bail. + if (!task.joinConsumerGroupAndStart()) + return; + while (getRunning()) { iteration(); } + // Make sure any uncommitted data has committed task.commitOffsets(true, -1); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 97409331d47a3..1f96c78604104 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -46,14 +46,14 @@ class WorkerSourceTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); - private ConnectorTaskId id; - private SourceTask task; + private final ConnectorTaskId id; + private final SourceTask task; private final Converter keyConverter; private final Converter valueConverter; private KafkaProducer producer; private WorkerSourceTaskThread workThread; - private OffsetStorageReader offsetReader; - private OffsetStorageWriter offsetWriter; + private final OffsetStorageReader offsetReader; + private final OffsetStorageWriter offsetWriter; private final WorkerConfig workerConfig; private final Time time; @@ -86,15 +86,12 @@ public WorkerSourceTask(ConnectorTaskId id, SourceTask task, @Override public void start(Properties props) { - task.initialize(new WorkerSourceTaskContext(offsetReader)); - task.start(props); - workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id); + workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props); workThread.start(); } @Override public void stop() { - task.stop(); if (workThread != null) workThread.startGracefulShutdown(); } @@ -111,7 +108,6 @@ public boolean awaitStop(long timeoutMs) { success = false; } } - commitOffsets(); return success; } @@ -277,13 +273,31 @@ private void finishSuccessfulFlush() { private class WorkerSourceTaskThread extends ShutdownableThread { - public WorkerSourceTaskThread(String name) { + private Properties workerProps; + private boolean finishedStart; + private boolean startedShutdownBeforeStartCompleted; + + public WorkerSourceTaskThread(String name, Properties workerProps) { super(name); + this.workerProps = workerProps; + this.finishedStart = false; + this.startedShutdownBeforeStartCompleted = false; } @Override public void execute() { try { + task.initialize(new WorkerSourceTaskContext(offsetReader)); + task.start(workerProps); + log.info("Source task {} finished initialization and start", this); + synchronized (this) { + if (startedShutdownBeforeStartCompleted) { + task.stop(); + return; + } + finishedStart = true; + } + while (getRunning()) { List records = task.poll(); if (records == null) @@ -293,6 +307,19 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } + + commitOffsets(); + } + + @Override + public void startGracefulShutdown() { + super.startGracefulShutdown(); + synchronized (this) { + if (finishedStart) + task.stop(); + else + startedShutdownBeforeStartCompleted = true; + } } } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 28e9e2e603945..177f7a668bdf9 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -136,6 +136,7 @@ public void testPollsInBackground() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); for (int i = 0; i < 10; i++) { workerThread.iteration(); } @@ -202,6 +203,7 @@ public void testCommit() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // First iteration gets one record workerThread.iteration(); // Second triggers commit, gets a second offset @@ -236,6 +238,7 @@ public void testCommitTaskFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); @@ -267,6 +270,7 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers first commit, third iteration triggers second (failing) commit workerThread.iteration(); workerThread.iteration(); @@ -292,6 +296,7 @@ public void testCommitConsumerFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); @@ -318,6 +323,7 @@ public void testCommitTimeout() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't // trigger another commit workerThread.iteration(); @@ -393,6 +399,7 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); workerThread.iteration(); workerThread.iteration(); workerThread.iteration(); @@ -436,6 +443,7 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); workerThread.iteration(); workerThread.iteration(); workerTask.stop(); @@ -448,7 +456,17 @@ public Object answer() throws Throwable { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); + workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, + workerTask, "mock-worker-thread", time, + workerConfig); + PowerMock.expectPrivate(workerTask, "createWorkerThread") + .andReturn(workerThread); + workerThread.start(); + PowerMock.expectLastCall(); + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + PowerMock.expectLastCall(); + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -464,14 +482,6 @@ public ConsumerRecords answer() throws Throwable { PowerMock.expectLastCall(); sinkTask.start(TASK_PROPS); PowerMock.expectLastCall(); - - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, - workerTask, "mock-worker-thread", time, - workerConfig); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); } private void expectStopTask(final long expectedMessages) throws Exception { diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java index 566391d217499..452c5cbea0f7d 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; import org.apache.kafka.copycat.source.SourceRecord; @@ -208,6 +209,34 @@ public void testSendRecordsConvertsData() throws Exception { PowerMock.verifyAll(); } + @Test + public void testSlowTaskStart() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + Utils.sleep(100); + return null; + } + }); + sourceTask.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, + // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it + // cannot be invoked immediately in the thread trying to stop the task. + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } private CountDownLatch expectPolls(int count) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(count);