From 4fc35c2fd6f358b9d234bf9338a7ef342c87af83 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 4 Nov 2015 17:04:56 -0800 Subject: [PATCH 1/2] KAFKA-2748: Ensure sink tasks commit offsets upon rebalance and rewind if the SinkTask flush fails. Also fix the incorrect consumer group ID setting which was giving each task its own group instead of one for the entire sink connector. --- .../kafka/copycat/runtime/WorkerSinkTask.java | 75 ++++++---- .../copycat/runtime/WorkerSinkTaskThread.java | 4 +- .../copycat/runtime/WorkerSinkTaskTest.java | 131 +++++++++++++----- 3 files changed, 141 insertions(+), 69 deletions(-) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 439a1f5f0cab6..476ed7f45c35f 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -62,6 +62,7 @@ class WorkerSinkTask implements WorkerTask { private WorkerSinkTaskThread workThread; private KafkaConsumer consumer; private WorkerSinkTaskContext context; + private Map lastCommittedOffsets; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, Time time) { @@ -75,11 +76,17 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf @Override public void start(Properties props) { - consumer = createConsumer(props); + consumer = createConsumer(); context = new WorkerSinkTaskContext(consumer); // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions // to work with. Any rewinding will be handled immediately when polling starts. + String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG); + if (topicsStr == null || topicsStr.isEmpty()) + throw new CopycatException("Sink tasks require a list of topics."); + String[] topics = topicsStr.split(","); + log.debug("Task {} subscribing to topics {}", id, topics); + consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); consumer.poll(0); task.initialize(context); @@ -92,7 +99,6 @@ public void start(Properties props) { @Override public void stop() { // Offset commit is handled upon exit in work thread - task.stop(); if (workThread != null) workThread.startGracefulShutdown(); consumer.wakeup(); @@ -100,17 +106,18 @@ public void stop() { @Override public boolean awaitStop(long timeoutMs) { + boolean success = true; if (workThread != null) { try { - boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); + success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS); if (!success) workThread.forceShutdown(); - return success; } catch (InterruptedException e) { - return false; + success = false; } } - return true; + task.stop(); + return success; } @Override @@ -142,28 +149,34 @@ public void poll(long timeoutMs) { /** * Starts an offset commit by flushing outstanding messages from the task and then starting * the write commit. This should only be invoked by the WorkerSinkTaskThread. + * @returns true if synchronous and successful or asynchronous, false if synchronous and failed **/ - public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) { + public void commitOffsets(boolean sync, final int seqno) { log.info("{} Committing offsets", this); - HashMap offsets = new HashMap<>(); + final HashMap offsets = new HashMap<>(); for (TopicPartition tp : consumer.assignment()) { - offsets.put(tp, new OffsetAndMetadata(consumer.position(tp))); + long pos = consumer.position(tp); + offsets.put(tp, new OffsetAndMetadata(pos)); + log.trace("{} committing {} offset {}", id, tp, pos); } - // We only don't flush the task in one case: when shutting down, the task has already been - // stopped and all data should have already been flushed - if (flush) { - try { - task.flush(offsets); - } catch (Throwable t) { - log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); - workThread.onCommitCompleted(t, seqno); - return; + + try { + task.flush(offsets); + } catch (Throwable t) { + log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t); + log.error("Rewinding offsets to last committed offsets"); + for (Map.Entry entry : lastCommittedOffsets.entrySet()) { + log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset()); + consumer.seek(entry.getKey(), entry.getValue().offset()); } + workThread.onCommitCompleted(t, seqno); + return; } if (sync) { try { consumer.commitSync(offsets); + lastCommittedOffsets = offsets; } catch (KafkaException e) { workThread.onCommitCompleted(e, seqno); } @@ -171,6 +184,7 @@ public void commitOffsets(long now, boolean sync, final int seqno, boolean flush OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception error) { + lastCommittedOffsets = offsets; workThread.onCommitCompleted(error, seqno); } }; @@ -186,16 +200,11 @@ public WorkerConfig workerConfig() { return workerConfig; } - private KafkaConsumer createConsumer(Properties taskProps) { - String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); - if (topicsStr == null || topicsStr.isEmpty()) - throw new CopycatException("Sink tasks require a list of topics."); - String[] topics = topicsStr.split(","); - + private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task Properties props = workerConfig.unusedProperties(); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.toString()); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector()); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -210,9 +219,6 @@ private KafkaConsumer createConsumer(Properties taskProps) { throw new CopycatException("Failed to create consumer", t); } - log.debug("Task {} subscribing to topics {}", id, topics); - newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - return newConsumer; } @@ -264,12 +270,23 @@ private void rewind() { private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { - task.onPartitionsAssigned(partitions); + lastCommittedOffsets = new HashMap<>(); + for (TopicPartition tp : partitions) { + long pos = consumer.position(tp); + lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); + log.trace("{} assigned topic partition {} with offset {}", id, tp, pos); + } + // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon + // task start. Since this callback gets invoked during that initial setup before we've started the task, we + // need to guard against invoking the user's callback method during that period. + if (workThread != null) + task.onPartitionsAssigned(partitions); } @Override public void onPartitionsRevoked(Collection partitions) { task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); } } } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java index 0e28c9703ae7b..486407d3c37a9 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java @@ -54,7 +54,7 @@ public void execute() { iteration(); } // Make sure any uncommitted data has committed - task.commitOffsets(task.time().milliseconds(), true, -1, false); + task.commitOffsets(true, -1); } public void iteration() { @@ -67,7 +67,7 @@ public void iteration() { commitSeqno += 1; commitStarted = now; } - task.commitOffsets(now, false, commitSeqno, true); + task.commitOffsets(false, commitSeqno); nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index acc11792695ec..28e9e2e603945 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.copycat.runtime; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -28,6 +29,7 @@ import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; +import org.apache.kafka.copycat.sink.SinkConnector; import org.apache.kafka.copycat.sink.SinkRecord; import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.storage.Converter; @@ -55,6 +57,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -83,6 +86,11 @@ public class WorkerSinkTaskTest extends ThreadedTest { private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200); + private static final Properties TASK_PROPS = new Properties(); + static { + TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); + } + private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private Time time; @Mock private SinkTask sinkTask; @@ -94,6 +102,7 @@ public class WorkerSinkTaskTest extends ThreadedTest { private WorkerSinkTask workerTask; @Mock private KafkaConsumer consumer; private WorkerSinkTaskThread workerThread; + private Capture rebalanceListener = EasyMock.newCapture(); private long recordsReturned; @@ -119,20 +128,19 @@ public void setup() { @Test public void testPollsInBackground() throws Exception { - Properties taskProps = new Properties(); - - expectInitializeTask(taskProps); + expectInitializeTask(); Capture> capturedRecords = expectPolls(1L); expectStopTask(10L); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); for (int i = 0; i < 10; i++) { workerThread.iteration(); } workerTask.stop(); - // No need for awaitStop since the thread is mocked + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); // Verify contents match expected values, i.e. that they were translated properly. With max @@ -183,18 +191,17 @@ public void testDeliverConvertsData() throws Exception { @Test public void testCommit() throws Exception { - Properties taskProps = new Properties(); - - expectInitializeTask(taskProps); + expectInitializeTask(); // Make each poll() take the offset commit interval Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, null, 0, true); expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); // First iteration gets one record workerThread.iteration(); // Second triggers commit, gets a second offset @@ -202,6 +209,7 @@ public void testCommit() throws Exception { // Commit finishes synchronously for testing so we can check this immediately assertEquals(0, workerThread.commitFailures()); workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); assertEquals(2, capturedRecords.getValues().size()); @@ -211,41 +219,79 @@ public void testCommit() throws Exception { @Test public void testCommitTaskFlushFailure() throws Exception { - Properties taskProps = new Properties(); - - expectInitializeTask(taskProps); - Capture> capturedRecords - = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectInitializeTask(); + Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, new RuntimeException(), null, 0, true); + // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization + // for all topic partitions + consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); + PowerMock.expectLastCall(); + consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); + PowerMock.expectLastCall(); expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); assertEquals(1, workerThread.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); PowerMock.verifyAll(); } @Test - public void testCommitConsumerFailure() throws Exception { - Properties taskProps = new Properties(); + public void testCommitTaskSuccessAndFlushFailure() throws Exception { + // Validate that we rewind to the correct + + expectInitializeTask(); + Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); + expectOffsetFlush(1L, null, null, 0, true); + expectOffsetFlush(2L, new RuntimeException(), null, 0, true); + // Should rewind to last known good positions, which in this case will be the offsets last committed. This test + // isn't quite accurate since we started with assigning 3 topic partitions and then only committed one, but what + // is important here is that we roll back to the last committed values. + consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); + PowerMock.expectLastCall(); + expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + // Second iteration triggers first commit, third iteration triggers second (failing) commit + workerThread.iteration(); + workerThread.iteration(); + workerThread.iteration(); + assertEquals(1, workerThread.commitFailures()); + assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); + workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); + workerTask.close(); + + PowerMock.verifyAll(); + } - expectInitializeTask(taskProps); + @Test + public void testCommitConsumerFailure() throws Exception { + expectInitializeTask(); Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); expectOffsetFlush(1L, null, new Exception(), 0, true); expectStopTask(2); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); @@ -253,6 +299,7 @@ public void testCommitConsumerFailure() throws Exception { assertEquals(1, workerThread.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); PowerMock.verifyAll(); @@ -260,18 +307,17 @@ public void testCommitConsumerFailure() throws Exception { @Test public void testCommitTimeout() throws Exception { - Properties taskProps = new Properties(); - - expectInitializeTask(taskProps); + expectInitializeTask(); // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit Capture> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); expectStopTask(4); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't // trigger another commit workerThread.iteration(); @@ -282,6 +328,7 @@ public void testCommitTimeout() throws Exception { assertEquals(1, workerThread.commitFailures()); assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); PowerMock.verifyAll(); @@ -291,10 +338,7 @@ public void testCommitTimeout() throws Exception { public void testAssignmentPauseResume() throws Exception { // Just validate that the calls are passed through to the consumer, and that where appropriate errors are // converted - - Properties taskProps = new Properties(); - - expectInitializeTask(taskProps); + expectInitializeTask(); expectOnePoll().andAnswer(new IAnswer() { @Override @@ -344,14 +388,16 @@ public Object answer() throws Throwable { PowerMock.expectLastCall(); expectStopTask(0); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); workerThread.iteration(); workerThread.iteration(); workerThread.iteration(); workerTask.stop(); + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); PowerMock.verifyAll(); @@ -359,8 +405,7 @@ public Object answer() throws Throwable { @Test public void testRewind() throws Exception { - Properties taskProps = new Properties(); - expectInitializeTask(taskProps); + expectInitializeTask(); final long startOffset = 40L; final Map offsets = new HashMap<>(); @@ -386,31 +431,41 @@ public Object answer() throws Throwable { }); expectStopTask(3); + EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); PowerMock.replayAll(); - workerTask.start(taskProps); + workerTask.start(TASK_PROPS); workerThread.iteration(); workerThread.iteration(); workerTask.stop(); - // No need for awaitStop since the thread is mocked + workerTask.awaitStop(Long.MAX_VALUE); workerTask.close(); PowerMock.verifyAll(); } - private void expectInitializeTask(Properties taskProps) throws Exception { - PowerMock.expectPrivate(workerTask, "createConsumer", taskProps) - .andReturn(consumer); + private void expectInitializeTask() throws Exception { + PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.empty()); + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { + @Override + public ConsumerRecords answer() throws Throwable { + rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + return ConsumerRecords.empty(); + } + }); + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); sinkTask.initialize(EasyMock.capture(sinkTaskContext)); PowerMock.expectLastCall(); - sinkTask.start(taskProps); + sinkTask.start(TASK_PROPS); PowerMock.expectLastCall(); - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"}, + workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, workerTask, "mock-worker-thread", time, workerConfig); PowerMock.expectPrivate(workerTask, "createWorkerThread") From d5f545f0a8e2dd7b5476d208168dbab6a885ebd7 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 5 Nov 2015 10:06:11 -0800 Subject: [PATCH 2/2] Fix outdated comment. --- .../java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 476ed7f45c35f..55a67c0079826 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -149,7 +149,6 @@ public void poll(long timeoutMs) { /** * Starts an offset commit by flushing outstanding messages from the task and then starting * the write commit. This should only be invoked by the WorkerSinkTaskThread. - * @returns true if synchronous and successful or asynchronous, false if synchronous and failed **/ public void commitOffsets(boolean sync, final int seqno) { log.info("{} Committing offsets", this);