From b9b8350c69e0181edb8e3f09660824fe0e4f8f45 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 2 Nov 2015 21:25:17 -0800 Subject: [PATCH 1/9] KAFKA-2752: Add VerifiableSource/Sink connectors and rolling bounce Copycat system tests. --- bin/kafka-run-class.sh | 2 +- build.gradle | 61 +++++++ checkstyle/import-control.xml | 4 + .../common/utils}/ThroughputThrottler.java | 34 +++- .../kafka/copycat/source/SourceRecord.java | 5 + .../kafka/copycat/runtime/WorkerSinkTask.java | 7 + .../copycat/runtime/WorkerSinkTaskThread.java | 6 +- .../copycat/runtime/WorkerSourceTask.java | 4 +- .../copycat/runtime/rest/RestServer.java | 4 + .../rest/resources/ConnectorsResource.java | 8 +- .../kafka/copycat/util/KafkaBasedLog.java | 6 + .../tools/VerifiableSinkConnector.java | 59 +++++++ .../copycat/tools/VerifiableSinkTask.java | 105 ++++++++++++ .../tools/VerifiableSourceConnector.java | 59 +++++++ .../copycat/tools/VerifiableSourceTask.java | 124 +++++++++++++++ settings.gradle | 2 +- tests/kafkatest/services/copycat.py | 149 +++++++++++++++--- tests/kafkatest/services/kafka/kafka.py | 7 +- .../services/kafka/templates/log4j.properties | 87 ++++++++++ .../templates/copycat_log4j.properties | 30 ++++ .../services/templates/tools_log4j.properties | 3 +- .../tests/copycat_distributed_test.py | 121 ++++++++++++-- .../kafka/tools/ProducerPerformance.java | 1 + .../kafka/tools/VerifiableProducer.java | 1 + 24 files changed, 830 insertions(+), 59 deletions(-) rename {tools/src/main/java/org/apache/kafka/tools => clients/src/main/java/org/apache/kafka/common/utils}/ThroughputThrottler.java (80%) create mode 100644 copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkConnector.java create mode 100644 copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkTask.java create mode 100644 copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceConnector.java create mode 100644 copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceTask.java create mode 100644 tests/kafkatest/services/kafka/templates/log4j.properties create mode 100644 tests/kafkatest/services/templates/copycat_log4j.properties diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index cfddae0f72415..c2b5f1bba389b 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -72,7 +72,7 @@ do CLASSPATH=$CLASSPATH:$dir/* done -for cc_pkg in "api" "runtime" "file" "json" +for cc_pkg in "api" "runtime" "file" "json" "tools" do for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar; do diff --git a/build.gradle b/build.gradle index c82ece977f556..ce6a7ae34476a 100644 --- a/build.gradle +++ b/build.gradle @@ -900,3 +900,64 @@ project(':copycat:file') { } test.dependsOn('checkstyleMain', 'checkstyleTest') } + +project(':copycat:tools') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-tools" + + dependencies { + compile project(':copycat:api') + compile "$slf4japi" + compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" + + testCompile "$junit" + testCompile "$easymock" + testCompile "$powermock" + testCompile "$powermock_easymock" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/*" + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + } + from (configurations.runtime) { + exclude('kafka-clients*') + exclude('copycat-*') + } + into "$buildDir/dependant-libs" + } + + jar { + dependsOn copyDependantLibs + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom(testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 75027f537a1be..4351d82206c3f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -204,6 +204,10 @@ + + + + diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java similarity index 80% rename from tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java rename to clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java index d8deb220066b4..7239e03102474 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.tools; +package org.apache.kafka.common.utils; /** @@ -46,6 +46,7 @@ public class ThroughputThrottler { long sleepDeficitNs = 0; long targetThroughput = -1; long startMs; + private boolean wakeup = false; /** * @param targetThroughput Can be messages/sec or bytes/sec @@ -83,7 +84,11 @@ public boolean shouldThrottle(long amountSoFar, long sendStartMs) { public void throttle() { if (targetThroughput == 0) { try { - Thread.sleep(Long.MAX_VALUE); + synchronized (this) { + while (!wakeup) { + this.wait(); + } + } } catch (InterruptedException e) { // do nothing } @@ -96,12 +101,19 @@ public void throttle() { // If enough sleep deficit has accumulated, sleep a little if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepMs = sleepDeficitNs / 1000000; - long sleepNs = sleepDeficitNs - sleepMs * 1000000; - long sleepStartNs = System.nanoTime(); + long currentTimeNs = sleepStartNs; try { - Thread.sleep(sleepMs, (int) sleepNs); + synchronized (this) { + long elapsed = currentTimeNs - sleepStartNs; + long remaining = sleepDeficitNs - elapsed; + while (!wakeup && remaining > 0) { + long sleepMs = remaining / 1000000; + long sleepNs = remaining - sleepMs * 1000000; + this.wait(sleepMs, (int) sleepNs); + } + wakeup = false; + } sleepDeficitNs = 0; } catch (InterruptedException e) { // If sleep is cut short, reduce deficit by the amount of @@ -113,5 +125,15 @@ public void throttle() { } } } + + /** + * Wakeup the throttler if its sleeping. + */ + public void wakeup() { + synchronized (this) { + wakeup = true; + this.notifyAll(); + } + } } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java index 7f54c10c7de56..4b7f525eaea0c 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java @@ -56,6 +56,11 @@ public SourceRecord(Map sourcePartition, Map sourceOffset, this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value); } + public SourceRecord(Map sourcePartition, Map sourceOffset, + String topic, Schema keySchema, Object key, Schema valueSchema, Object value) { + this(sourcePartition, sourceOffset, topic, null, keySchema, key, valueSchema, value); + } + public SourceRecord(Map sourcePartition, Map sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) { diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 55a67c0079826..1e6a3a7209076 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -199,6 +199,13 @@ public WorkerConfig workerConfig() { return workerConfig; } + @Override + public String toString() { + return "WorkerSinkTask{" + + "id=" + id + + '}'; + } + private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java index 486407d3c37a9..dc9cf8d7b58fb 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java @@ -75,7 +75,7 @@ public void iteration() { long commitTimeout = commitStarted + task.workerConfig().getLong( WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", this); + log.warn("Commit of {} offsets timed out", task); commitFailures++; committing = false; } @@ -93,11 +93,11 @@ public void onCommitCompleted(Throwable error, long seqno) { seqno, commitSeqno); } else { if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", this, error); + log.error("Commit of {} offsets threw an unexpected exception: ", task, error); commitFailures++; } else { log.debug("Finished {} offset commit successfully in {} ms", - this, task.time().milliseconds() - commitStarted); + task, task.time().milliseconds() - commitStarted); commitFailures = 0; } committing = false; diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 97409331d47a3..4bee04c8b1f76 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -174,6 +174,8 @@ private synchronized void recordSent(final ProducerRecord record public boolean commitOffsets() { long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + log.debug("{} Committing offsets", this); + long started = time.milliseconds(); long timeout = started + commitTimeoutMs; @@ -255,7 +257,7 @@ public void onCompletion(Throwable error, Void result) { } finishSuccessfulFlush(); - log.debug("Finished {} commitOffsets successfully in {} ms", + log.info("Finished {} commitOffsets successfully in {} ms", this, time.milliseconds() - started); return true; } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java index 5da747d668c60..4e01eec2ce2ab 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java @@ -133,6 +133,8 @@ public void start(Herder herder) { } public void stop() { + log.info("Stopping REST server"); + try { jettyServer.stop(); jettyServer.join(); @@ -141,6 +143,8 @@ public void stop() { } finally { jettyServer.destroy(); } + + log.info("REST server stopped"); } /** diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java index 823155e0bc6d0..33334f6a77c31 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResource.java @@ -27,6 +27,8 @@ import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException; import org.apache.kafka.copycat.util.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.ServletContext; import javax.ws.rs.Consumes; @@ -51,6 +53,8 @@ @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public class ConnectorsResource { + private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class); + // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, @@ -159,7 +163,9 @@ private T completeOrForwardRequest( } catch (ExecutionException e) { if (e.getCause() instanceof NotLeaderException) { NotLeaderException notLeaderError = (NotLeaderException) e.getCause(); - return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType)); + String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path); + log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body); + return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType)); } throw e.getCause(); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java index f5e72d33d096e..7fc9da0ffbb3a 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java @@ -189,6 +189,7 @@ public void stop() { * @param callback the callback to invoke once the end of the log has been reached. */ public void readToEnd(Callback callback) { + log.trace("Starting read to end log for topic {}", topic); producer.flush(); synchronized (this) { readLogEndOffsetCallbacks.add(callback); @@ -286,6 +287,10 @@ private void readToLogEnd() { private class WorkThread extends Thread { + public WorkThread() { + super("KafkaBasedLog Work Thread - " + topic); + } + @Override public void run() { try { @@ -300,6 +305,7 @@ public void run() { if (numCallbacks > 0) { try { readToLogEnd(); + log.trace("Finished read to end log for topic {}", topic); } catch (WakeupException e) { // Either received another get() call and need to retry reading to end of log or stop() was // called. Both are handled by restarting this loop. diff --git a/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkConnector.java b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkConnector.java new file mode 100644 index 0000000000000..8136e80241e9e --- /dev/null +++ b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkConnector.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.tools; + +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.source.SourceConnector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * @see VerifiableSinkTask + */ +public class VerifiableSinkConnector extends SourceConnector { + private Properties config; + + @Override + public void start(Properties props) { + this.config = props; + } + + @Override + public Class taskClass() { + return VerifiableSinkTask.class; + } + + @Override + public List taskConfigs(int maxTasks) { + ArrayList configs = new ArrayList<>(); + for (Integer i = 0; i < maxTasks; i++) { + Properties props = new Properties(); + for (String propName : config.stringPropertyNames()) + props.setProperty(propName, config.getProperty(propName)); + props.setProperty(VerifiableSourceTask.ID_CONFIG, i.toString()); + configs.add(props); + } + return configs; + } + + @Override + public void stop() { + } +} diff --git a/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkTask.java b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkTask.java new file mode 100644 index 0000000000000..0e4fbbf321c99 --- /dev/null +++ b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSinkTask.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.sink.SinkRecord; +import org.apache.kafka.copycat.sink.SinkTask; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Counterpart to {@link VerifiableSourceTask} that consumes records and logs information about each to stdout. This + * allows validation of processing of messages by sink tasks on distributed workers even in the face of worker restarts + * and failures. This task relies on the offset management provided by the Copycat framework and therefore can detect + * bugs in its implementation. + */ +public class VerifiableSinkTask extends SinkTask { + public static final String NAME_CONFIG = "name"; + public static final String ID_CONFIG = "id"; + + private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + + private String name; // Connector name + private int id; // Task ID + + private ArrayList> unflushed = new ArrayList<>(); + + @Override + public void start(Properties props) { + try { + name = props.getProperty(NAME_CONFIG); + id = Integer.parseInt(props.getProperty(ID_CONFIG)); + } catch (NumberFormatException e) { + throw new CopycatException("Invalid VerifiableSourceTask configuration", e); + } + } + + @Override + public void put(Collection records) { + long nowMs = System.currentTimeMillis(); + for (SinkRecord record : records) { + Map data = new HashMap<>(); + data.put("name", name); + data.put("task", record.key()); // VerifiableSourceTask's input task (source partition) + data.put("sinkTask", id); + data.put("topic", record.topic()); + data.put("time_ms", nowMs); + data.put("seqno", record.value()); + data.put("offset", record.kafkaOffset()); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + unflushed.add(data); + } + } + + @Override + public void flush(Map offsets) { + long nowMs = System.currentTimeMillis(); + for (Map data : unflushed) { + data.put("time_ms", nowMs); + data.put("flushed", true); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + } + unflushed.clear(); + } + + @Override + public void stop() { + + } +} diff --git a/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceConnector.java b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceConnector.java new file mode 100644 index 0000000000000..2afcecf8ad7a8 --- /dev/null +++ b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceConnector.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.tools; + +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.source.SourceConnector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * @see VerifiableSourceTask + */ +public class VerifiableSourceConnector extends SourceConnector { + private Properties config; + + @Override + public void start(Properties props) { + this.config = props; + } + + @Override + public Class taskClass() { + return VerifiableSourceTask.class; + } + + @Override + public List taskConfigs(int maxTasks) { + ArrayList configs = new ArrayList<>(); + for (Integer i = 0; i < maxTasks; i++) { + Properties props = new Properties(); + for (String propName : config.stringPropertyNames()) + props.setProperty(propName, config.getProperty(propName)); + props.setProperty(VerifiableSourceTask.ID_CONFIG, i.toString()); + configs.add(props); + } + return configs; + } + + @Override + public void stop() { + } +} diff --git a/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceTask.java b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceTask.java new file mode 100644 index 0000000000000..0a96d3e1d3f8f --- /dev/null +++ b/copycat/tools/src/main/java/org/apache/kafka/copycat/tools/VerifiableSourceTask.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.utils.ThroughputThrottler; +import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The + * tasks print metadata in the form of JSON to stdout for each message generated, making externally visible which + * messages have been sent. Each message is also assigned a unique, increasing seqno that is passed to Copycat; when + * tasks are started on new nodes, this seqno is used to resume where the task previously left off, allowing for + * testing of distributed Copycat. + * + * If logging is left enabled, log output on stdout can be easily ignored by checking whether a given line is valid JSON. + */ +public class VerifiableSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class); + + public static final String NAME_CONFIG = "name"; + public static final String ID_CONFIG = "id"; + public static final String TOPIC_CONFIG = "topic"; + public static final String THROUGHPUT_CONFIG = "throughput"; + + private static final String ID_FIELD = "id"; + private static final String SEQNO_FIELD = "seqno"; + + private static final ObjectMapper JSON_SERDE = new ObjectMapper(); + + private String name; // Connector name + private int id; // Task ID + private String topic; + private Map partition; + private long startingSeqno; + private long seqno; + private ThroughputThrottler throttler; + + @Override + public void start(Properties props) { + final long throughput; + try { + name = props.getProperty(NAME_CONFIG); + id = Integer.parseInt(props.getProperty(ID_CONFIG)); + topic = props.getProperty(TOPIC_CONFIG); + throughput = Long.parseLong(props.getProperty(THROUGHPUT_CONFIG)); + } catch (NumberFormatException e) { + throw new CopycatException("Invalid VerifiableSourceTask configuration", e); + } + + partition = Collections.singletonMap(ID_FIELD, id); + Map previousOffset = this.context.offsetStorageReader().offset(partition); + if (previousOffset != null) + seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1; + else + seqno = 0; + startingSeqno = seqno; + throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); + + log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno); + } + + @Override + public List poll() throws InterruptedException { + long sendStartMs = System.currentTimeMillis(); + if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs)) + throttler.throttle(); + + long nowMs = System.currentTimeMillis(); + + Map data = new HashMap<>(); + data.put("name", name); + data.put("task", id); + data.put("topic", this.topic); + data.put("time_ms", nowMs); + data.put("seqno", seqno); + String dataJson; + try { + dataJson = JSON_SERDE.writeValueAsString(data); + } catch (JsonProcessingException e) { + dataJson = "Bad data can't be written as json: " + e.getMessage(); + } + System.out.println(dataJson); + + Map ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno); + SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno); + List result = Arrays.asList(srcRecord); + seqno++; + return result; + } + + @Override + public void stop() { + throttler.wakeup(); + } +} diff --git a/settings.gradle b/settings.gradle index 357305ba23e8c..a71a9d5e79da7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,4 @@ apply from: file('scala.gradle') include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', - 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file' \ No newline at end of file + 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file', 'copycat:tools' \ No newline at end of file diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py index 5ad9dd5c08c53..e06a1f521a83e 100644 --- a/tests/kafkatest/services/copycat.py +++ b/tests/kafkatest/services/copycat.py @@ -18,14 +18,30 @@ from ducktape.errors import DucktapeError from kafkatest.services.kafka.directory import kafka_dir -import signal, random, requests +import signal, random, requests, os.path, json class CopycatServiceBase(Service): """Base class for Copycat services providing some common settings and functionality""" + PERSISTENT_ROOT = "/mnt/copycat" + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "copycat.properties") + # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately + # so they can be used for other output, e.g. verifiable source & sink. + LOG_FILE = os.path.join(PERSISTENT_ROOT, "copycat.log") + STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "copycat.stdout") + STDERR_FILE = os.path.join(PERSISTENT_ROOT, "copycat.stderr") + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "copycat-log4j.properties") + PID_FILE = os.path.join(PERSISTENT_ROOT, "copycat.pid") + logs = { - "kafka_log": { - "path": "/mnt/copycat.log", + "copycat_log": { + "path": LOG_FILE, + "collect_default": True}, + "copycat_stdout": { + "path": STDOUT_FILE, + "collect_default": False}, + "copycat_stderr": { + "path": STDERR_FILE, "collect_default": True}, } @@ -37,7 +53,7 @@ def __init__(self, context, num_nodes, kafka, files): def pids(self, node): """Return process ids for Copycat processes.""" try: - return [pid for pid in node.account.ssh_capture("cat /mnt/copycat.pid", callback=int)] + return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)] except: return [] @@ -52,19 +68,22 @@ def set_configs(self, config_template_func, connector_config_templates=None): self.connector_config_templates = connector_config_templates def stop_node(self, node, clean_shutdown=True): + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Copycat on " + str(node.account)) pids = self.pids(node) sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL for pid in pids: - node.account.signal(pid, sig, allow_fail=False) - for pid in pids: - wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Copycat standalone process took too long to exit") + node.account.signal(pid, sig, allow_fail=True) + if clean_shutdown: + for pid in pids: + wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Copycat process on " + str(node.account) + " took too long to exit") - node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False) + node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) def restart(self): # We don't want to do any clean up here, just restart the process. for node in self.nodes: + self.logger.info("Restarting Copycat on " + str(node.account)) self.stop_node(node) self.start_node(node) @@ -72,13 +91,12 @@ def clean_node(self, node): if len(self.pids(node)) > 0: self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) - for pid in self.pids(node): - node.account.signal(pid, signal.SIGKILL, allow_fail=False) + self.stop_node(node, clean_shutdown=False) - node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False) + node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False) def config_filenames(self): - return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])] + return [os.path.join(self.PERSISTENT_ROOT, "copycat-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] def list_connectors(self, node=None): @@ -112,6 +130,7 @@ def _rest(self, path, body=None, node=None, method="GET"): meth = getattr(requests, method.lower()) url = self._base_url(node) + path + self.logger.debug("Copycat REST request: %s %s %s %s", node.account.hostname, url, method, body) resp = meth(url, json=body) self.logger.debug("%s %s response: %d", url, method, resp.status_code) if resp.status_code > 400: @@ -137,19 +156,23 @@ def node(self): return self.nodes[0] def start_node(self, node): - node.account.create_file("/mnt/copycat.properties", self.config_template_func(node)) + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) remote_connector_configs = [] for idx, template in enumerate(self.connector_config_templates): - target_file = "/mnt/copycat-connector-" + str(idx) + ".properties" + target_file = os.path.join(self.PERSISTENT_ROOT, "copycat-connector-" + str(idx) + ".properties") node.account.create_file(target_file, template) remote_connector_configs.append(target_file) - self.logger.info("Starting Copycat standalone process") - with node.account.monitor_log("/mnt/copycat.log") as monitor: - node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) + + self.logger.info("Starting Copycat standalone process on " + str(node.account)) + with node.account.monitor_log(self.LOG_FILE) as monitor: + node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + + "/opt/%s/bin/copycat-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + " ".join(remote_connector_configs) + - " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") - monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") + (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE))) + monitor.wait_until('Copycat started', timeout_sec=15, err_msg="Never saw message indicating Copycat finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -164,16 +187,20 @@ def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offs self.configs_topic = configs_topic def start_node(self, node): - node.account.create_file("/mnt/copycat.properties", self.config_template_func(node)) + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) if self.connector_config_templates: raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API") - self.logger.info("Starting Copycat distributed process") - with node.account.monitor_log("/mnt/copycat.log") as monitor: - cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node) - cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid" + self.logger.info("Starting Copycat distributed process on " + str(node.account)) + with node.account.monitor_log(self.LOG_FILE) as monitor: + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "/opt/%s/bin/copycat-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) node.account.ssh(cmd) - monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") + monitor.wait_until('Copycat started', timeout_sec=15, err_msg="Never saw message indicating Copycat finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -188,4 +215,74 @@ def __init__(self, status, msg, url): self.url = url def __unicode__(self): - return "Copycat REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message \ No newline at end of file + return "Copycat REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message + + + +class VerifiableConnector(object): + def messages(self): + """ + Collect and parse the logs from Copycat nodes. Return a list containing all parsed JSON messages generated by + this source. + """ + self.logger.info("Collecting messages from log of %s %s", type(self).__name__, self.name) + records = [] + for node in self.cc.nodes: + for line in node.account.ssh_capture('cat ' + self.cc.STDOUT_FILE): + try: + data = json.loads(line) + except ValueError: + continue + # Filter to only ones matching our name to support multiple verifiable producers + if data['name'] != self.name: continue + data['node'] = node + records.append(data) + return records + + def stop(self): + self.logger.info("Destroying connector %s %s", type(self).__name__, self.name) + self.cc.delete_connector(self.name) + +class VerifiableSource(VerifiableConnector): + """ + Helper class for running a verifiable source connector on a Copycat cluster and analyzing the output. + """ + + def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000): + self.cc = cc + self.logger = self.cc.logger + self.name = name + self.tasks = tasks + self.topic = topic + self.throughput = throughput + + def start(self): + self.logger.info("Creating connector VerifiableSourceConnector %s", self.name) + self.cc.create_connector({ + 'name': self.name, + 'connector.class': 'org.apache.kafka.copycat.tools.VerifiableSourceConnector', + 'tasks.max': self.tasks, + 'topic': self.topic, + 'throughput': self.throughput + }) + +class VerifiableSink(VerifiableConnector): + """ + Helper class for running a verifiable sink connector on a Copycat cluster and analyzing the output. + """ + + def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]): + self.cc = cc + self.logger = self.cc.logger + self.name = name + self.tasks = tasks + self.topics = topics + + def start(self): + self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) + self.cc.create_connector({ + 'name': self.name, + 'connector.class': 'org.apache.kafka.copycat.tools.VerifiableSinkConnector', + 'tasks.max': self.tasks, + 'topics': ",".join(self.topics) + }) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 6778b5a881e40..3c0f264f8c2cb 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -29,10 +29,13 @@ import signal import subprocess import time - +import os.path class KafkaService(JmxMixin, Service): + PERSISTENT_ROOT = "/mnt" + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties") + logs = { "kafka_log": { "path": "/mnt/kafka.log", @@ -104,6 +107,7 @@ def prop_file(self, node): def start_cmd(self, node): cmd = "export JMX_PORT=%d; " % self.jmx_port + cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; " cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &" @@ -114,6 +118,7 @@ def start_node(self, node): self.logger.info("kafka.properties:") self.logger.info(prop_file) node.account.create_file("/mnt/kafka.properties", prop_file) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('log4j.properties')) self.security_config.setup_node(node) diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties new file mode 100644 index 0000000000000..bf816e76d2f2f --- /dev/null +++ b/tests/kafkatest/services/kafka/templates/log4j.properties @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log +log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false + +#Change this to debug to get the actual audit log for authorizer. +log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender +log4j.additivity.kafka.authorizer.logger=false + diff --git a/tests/kafkatest/services/templates/copycat_log4j.properties b/tests/kafkatest/services/templates/copycat_log4j.properties new file mode 100644 index 0000000000000..574b8a75ebba4 --- /dev/null +++ b/tests/kafkatest/services/templates/copycat_log4j.properties @@ -0,0 +1,30 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +# Define the root logger with appender file +log4j.rootLogger = {{ log_level|default("INFO") }}, FILE + +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File={{ log_file }} +log4j.appender.FILE.ImmediateFlush=true +log4j.appender.FILE.Threshold=debug +log4j.appender.FILE.Append=true +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)% + +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.I0Itec.zkclient=ERROR diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index ce30d527abc2e..2bad8b2e6db85 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -20,7 +20,6 @@ log4j.appender.FILE=org.apache.log4j.FileAppender log4j.appender.FILE.File={{ log_file }} log4j.appender.FILE.ImmediateFlush=true log4j.appender.FILE.Threshold=debug -# Set the append to false, overwrite -log4j.appender.FILE.Append=false +log4j.appender.FILE.Append=true log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n \ No newline at end of file diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py index 10aa5e200f011..787c180063e3b 100644 --- a/tests/kafkatest/tests/copycat_distributed_test.py +++ b/tests/kafkatest/tests/copycat_distributed_test.py @@ -14,11 +14,12 @@ # limitations under the License. from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.copycat import CopycatDistributedService +from kafkatest.services.copycat import CopycatDistributedService, VerifiableSource, VerifiableSink from ducktape.utils.util import wait_until -import hashlib, subprocess, json, itertools +import hashlib, subprocess, json, itertools, time +from collections import Counter -class CopycatDistributedFileTest(KafkaTest): +class CopycatDistributedTest(KafkaTest): """ Simple test of Copycat in distributed mode, producing data from files on on Copycat cluster and consuming it on another, validating the total output is identical to the input. @@ -41,18 +42,20 @@ class CopycatDistributedFileTest(KafkaTest): SCHEMA = { "type": "string", "optional": False } def __init__(self, test_context): - super(CopycatDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + super(CopycatDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ 'test' : { 'partitions': 1, 'replication-factor': 1 } }) - self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.cc = CopycatDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.key_converter = "org.apache.kafka.copycat.json.JsonConverter" + self.value_converter = "org.apache.kafka.copycat.json.JsonConverter" + self.schemas = True - def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True): - assert converter != None, "converter type must be set" - # Template parameters - self.key_converter = converter - self.value_converter = converter - self.schemas = schemas + def test_file_source_and_sink(self): + """ + Tests that a basic file connector works across clean rolling bounces. This validates that the connector is + correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes. + """ self.cc.set_configs(lambda node: self.render("copycat-distributed.properties", node=node)) @@ -68,7 +71,7 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.Jso # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") # Restarting both should result in them picking up where they left off, # only processing new data. @@ -76,19 +79,103 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.Jso for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file") - def validate_output(self, input): + + def test_clean_bounce(self): + """ + Validates that source and sink tasks that run continuously and produce a predictable sequence of messages + run correctly and deliver messages exactly once when Copycat workers undergo clean rolling bounces. + """ + num_tasks = 3 + + self.cc.set_configs(lambda node: self.render("copycat-distributed.properties", node=node)) + self.cc.start() + + self.source = VerifiableSource(self.cc, tasks=num_tasks) + self.source.start() + self.sink = VerifiableSink(self.cc, tasks=num_tasks) + self.sink.start() + + for _ in range(1): + for node in self.cc.nodes: + started = time.time() + self.logger.info("Cleanly bouncing Copycat on " + str(node.account)) + self.cc.stop_node(node) + with node.account.monitor_log(self.cc.LOG_FILE) as monitor: + self.cc.start_node(node) + monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, + err_msg="Copycat worker didn't successfully join group and start work") + self.logger.info("Bounced Copycat on %s and rejoined in %f seconds", node.account, time.time() - started) + + self.source.stop() + self.sink.stop() + self.cc.stop() + + # Validate at least once delivery of everything that was reported as written since we should have flushed and + # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed + # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across + # tasks. + src_msgs = self.source.messages() + sink_msgs = self.sink.messages() + success = True + errors = [] + for task in range(num_tasks): + src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task] + # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean + # bouncing should commit on rebalance. + src_seqno_max = max(src_seqnos) + src_seqno_counts = Counter(src_seqnos) + missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) + duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1]) + + if missing_src_seqnos: + self.logger.error("Missing source sequence numbers for task " + str(task)) + errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos)) + success = False + if duplicate_src_seqnos: + self.logger.error("Duplicate source sequence numbers for task " + str(task)) + errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos)) + success = False + + sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed' in msg] + # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because + # clean bouncing should commit on rebalance. + sink_seqno_max = max(sink_seqnos) + sink_seqno_counts = Counter(sink_seqnos) + missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) + duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1]) + + if missing_sink_seqnos: + self.logger.error("Missing sink sequence numbers for task " + str(task)) + errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos)) + success = False + if duplicate_sink_seqnos: + self.logger.error("Duplicate sink sequence numbers for task " + str(task)) + errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos)) + success = False + + + if sink_seqno_max > src_seqno_max: + self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max) + errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max)) + success = False + + if not success: + self.mark_for_collect(self.cc) + assert success, "Found validation errors:\n" + "\n ".join(errors) + + + def _validate_file_output(self, input): input_set = set(input) # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. # Between the first and second rounds, we might even end up with half the data on each node. output_set = set(itertools.chain(*[ - [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes + [line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes ])) return input_set == output_set - - def file_contents(self, node, file): + def _file_contents(self, node, file): try: # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of # immediately diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 3a068626c5c6a..2a7f7b18e128c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.utils.ThroughputThrottler; public class ProducerPerformance { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 0cd90c0bbf57a..e8bd33025e1a0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.ThroughputThrottler; /** * Primarily intended for use with system testing, this producer prints metadata From 0fe6789754d2e203ce3d5b7f13404d051a8a79c0 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 5 Nov 2015 14:10:46 -0800 Subject: [PATCH 2/9] Address review comments. --- .../apache/kafka/common/utils/ThroughputThrottler.java | 2 ++ tests/kafkatest/tests/copycat_distributed_test.py | 10 +++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java index 7239e03102474..1c63ffb08c58c 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java @@ -111,6 +111,8 @@ public void throttle() { long sleepMs = remaining / 1000000; long sleepNs = remaining - sleepMs * 1000000; this.wait(sleepMs, (int) sleepNs); + elapsed = System.nanoTime() - sleepStartNs; + remaining = sleepDeficitNs - elapsed; } wakeup = false; } diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py index 787c180063e3b..41d05493997fb 100644 --- a/tests/kafkatest/tests/copycat_distributed_test.py +++ b/tests/kafkatest/tests/copycat_distributed_test.py @@ -71,7 +71,7 @@ def test_file_source_and_sink(self): # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") # Restarting both should result in them picking up where they left off, # only processing new data. @@ -79,7 +79,7 @@ def test_file_source_and_sink(self): for node in self.cc.nodes: node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE) - wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file") + wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file") def test_clean_bounce(self): @@ -97,7 +97,7 @@ def test_clean_bounce(self): self.sink = VerifiableSink(self.cc, tasks=num_tasks) self.sink.start() - for _ in range(1): + for _ in range(3): for node in self.cc.nodes: started = time.time() self.logger.info("Cleanly bouncing Copycat on " + str(node.account)) @@ -161,6 +161,10 @@ def test_clean_bounce(self): errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max)) success = False + if src_seqno_max < 1000 or sink_seqno_max < 1000: + errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max)) + success = False + if not success: self.mark_for_collect(self.cc) assert success, "Found validation errors:\n" + "\n ".join(errors) From 00016d8bfff859c17f58b3c4dd6add379e80a473 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 5 Nov 2015 18:12:20 -0800 Subject: [PATCH 3/9] Add debugging support by collecting a dump of the topic in case of failure and logging a bit more info during the test run. --- tests/kafkatest/services/console_consumer.py | 9 +++++++-- tests/kafkatest/services/copycat.py | 1 + tests/kafkatest/tests/copycat_distributed_test.py | 7 +++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 18021d97f99a6..84d358d5a053d 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -100,7 +100,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService): } def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None, - from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]): + from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", + print_key=False, jmx_object_names=None, jmx_attributes=[]): """ Args: context: standard context @@ -114,6 +115,7 @@ def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message successively consumed messages exceeds this timeout. Setting this and waiting for the consumer to stop is a pretty good way to consume all messages in a topic. + print_key if True, print each message's key in addition to its value """ JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) BackgroundThreadService.__init__(self, context, num_nodes) @@ -131,7 +133,7 @@ def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} self.client_id = client_id - + self.print_key = print_key def prop_file(self, node): """Return a string which can be used to create a configuration file appropriate for the given node.""" @@ -184,6 +186,9 @@ def start_cmd(self, node): if node.version > LATEST_0_8_2: cmd += " --timeout-ms %s" % self.consumer_timeout_ms + if self.print_key: + cmd += " --property print.key=true" + cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py index e06a1f521a83e..d94dcf1c4ec6b 100644 --- a/tests/kafkatest/services/copycat.py +++ b/tests/kafkatest/services/copycat.py @@ -232,6 +232,7 @@ def messages(self): try: data = json.loads(line) except ValueError: + self.logger.debug("Ignoring unparseable line: %s", line) continue # Filter to only ones matching our name to support multiple verifiable producers if data['name'] != self.name: continue diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py index 41d05493997fb..72acf9b578188 100644 --- a/tests/kafkatest/tests/copycat_distributed_test.py +++ b/tests/kafkatest/tests/copycat_distributed_test.py @@ -15,6 +15,7 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.copycat import CopycatDistributedService, VerifiableSource, VerifiableSink +from kafkatest.services.console_consumer import ConsoleConsumer from ducktape.utils.util import wait_until import hashlib, subprocess, json, itertools, time from collections import Counter @@ -125,6 +126,7 @@ def test_clean_bounce(self): # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean # bouncing should commit on rebalance. src_seqno_max = max(src_seqnos) + self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1]) @@ -142,6 +144,7 @@ def test_clean_bounce(self): # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because # clean bouncing should commit on rebalance. sink_seqno_max = max(sink_seqnos) + self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1]) @@ -167,6 +170,10 @@ def test_clean_bounce(self): if not success: self.mark_for_collect(self.cc) + # Also collect the data in the topic to aid in debugging + consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True) + consumer_validator.run() + self.mark_for_collect(consumer_validator, "consumer_stdout") assert success, "Found validation errors:\n" + "\n ".join(errors) From 7a176e71e610785f17fd22eaaf290288c9cbcdef Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 5 Nov 2015 20:28:40 -0800 Subject: [PATCH 4/9] Increase log level to DEBUG for Copycat services since this level will be required to debug the distributed test failures. --- tests/kafkatest/tests/copycat_distributed_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py index 72acf9b578188..eba81eb25e105 100644 --- a/tests/kafkatest/tests/copycat_distributed_test.py +++ b/tests/kafkatest/tests/copycat_distributed_test.py @@ -48,6 +48,7 @@ def __init__(self, test_context): }) self.cc = CopycatDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.cc.log_level = "DEBUG" self.key_converter = "org.apache.kafka.copycat.json.JsonConverter" self.value_converter = "org.apache.kafka.copycat.json.JsonConverter" self.schemas = True From dd9d41e5d2553cf15d5237e888f15bb611f61b67 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 6 Nov 2015 08:45:21 -0800 Subject: [PATCH 5/9] KAFKA-2713: Run task start and stop methods in worker threads so they execute in parallel and cannot block the herder thread. --- .../apache/kafka/copycat/sink/SinkTask.java | 16 ++++++ .../kafka/copycat/source/SourceTask.java | 20 +++++++ .../kafka/copycat/runtime/WorkerSinkTask.java | 52 +++++++++++++------ .../copycat/runtime/WorkerSinkTaskThread.java | 5 ++ .../copycat/runtime/WorkerSourceTask.java | 47 +++++++++++++---- .../copycat/runtime/WorkerSinkTaskTest.java | 26 +++++++--- .../copycat/runtime/WorkerSourceTaskTest.java | 29 +++++++++++ 7 files changed, 161 insertions(+), 34 deletions(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java index c6cd12f67eb5a..7c03cda9f3715 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; +import java.util.Properties; /** * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In @@ -46,6 +47,13 @@ public void initialize(SinkTaskContext context) { this.context = context; } + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Properties props); + /** * Put the records in the sink. Usually this should send the records to the sink asynchronously * and immediately return. @@ -84,4 +92,12 @@ public void onPartitionsAssigned(Collection partitions) { */ public void onPartitionsRevoked(Collection partitions) { } + + /** + * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other + * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset + * commit has completed. Implementations of this method should only need to perform final cleanup operations, such + * as closing network connections to the sink system. + */ + public abstract void stop(); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java index 1e1da345979b2..30cbf16f155c8 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.copycat.connector.Task; import java.util.List; +import java.util.Properties; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. @@ -37,6 +38,13 @@ public void initialize(SourceTaskContext context) { this.context = context; } + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Properties props); + /** * Poll this SourceTask for new records. This method should block if no data is currently * available. @@ -59,4 +67,16 @@ public void initialize(SourceTaskContext context) { public void commit() throws InterruptedException { // This space intentionally left blank. } + + /** + * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop + * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has + * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and + * {@link #commit()}. + * + * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method + * could set a flag that will force {@link #poll()} to exit immediately and invoke + * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests. + */ + public abstract void stop(); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index 1e6a3a7209076..daa82faf28a5d 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -60,8 +60,10 @@ class WorkerSinkTask implements WorkerTask { private final Converter keyConverter; private final Converter valueConverter; private WorkerSinkTaskThread workThread; + private Properties taskProps; private KafkaConsumer consumer; private WorkerSinkTaskContext context; + private boolean started; private Map lastCommittedOffsets; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, @@ -72,26 +74,15 @@ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConf this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.time = time; + this.started = false; } @Override public void start(Properties props) { + taskProps = props; consumer = createConsumer(); context = new WorkerSinkTaskContext(consumer); - // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions - // to work with. Any rewinding will be handled immediately when polling starts. - String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG); - if (topicsStr == null || topicsStr.isEmpty()) - throw new CopycatException("Sink tasks require a list of topics."); - String[] topics = topicsStr.split(","); - log.debug("Task {} subscribing to topics {}", id, topics); - consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - consumer.poll(0); - - task.initialize(context); - task.start(props); - workThread = createWorkerThread(); workThread.start(); } @@ -128,6 +119,35 @@ public void close() { consumer.close(); } + /** + * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the + * SinkTask. + * + * @returns true if successful, false if joining the consumer group was interrupted + */ + public boolean joinConsumerGroupAndStart() { + String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); + if (topicsStr == null || topicsStr.isEmpty()) + throw new CopycatException("Sink tasks require a list of topics."); + String[] topics = topicsStr.split(","); + log.debug("Task {} subscribing to topics {}", id, topics); + consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); + + // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions + // to work with. Any rewinding will be handled immediately when polling starts. + try { + consumer.poll(0); + } catch (WakeupException e) { + log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); + return false; + } + task.initialize(context); + task.start(taskProps); + log.info("Sink task {} finished initialization and start", this); + started = true; + return true; + } + /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ public void poll(long timeoutMs) { try { @@ -156,7 +176,7 @@ public void commitOffsets(boolean sync, final int seqno) { for (TopicPartition tp : consumer.assignment()) { long pos = consumer.position(tp); offsets.put(tp, new OffsetAndMetadata(pos)); - log.trace("{} committing {} offset {}", id, tp, pos); + log.debug("{} committing {} offset {}", id, tp, pos); } try { @@ -280,12 +300,12 @@ public void onPartitionsAssigned(Collection partitions) { for (TopicPartition tp : partitions) { long pos = consumer.position(tp); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); - log.trace("{} assigned topic partition {} with offset {}", id, tp, pos); + log.debug("{} assigned topic partition {} with offset {}", id, tp, pos); } // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (workThread != null) + if (started) task.onPartitionsAssigned(partitions); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java index dc9cf8d7b58fb..1c59646f6ab86 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java @@ -50,9 +50,14 @@ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time, @Override public void execute() { + // Try to join and start. If we're interrupted before this completes, bail. + if (!task.joinConsumerGroupAndStart()) + return; + while (getRunning()) { iteration(); } + // Make sure any uncommitted data has committed task.commitOffsets(true, -1); } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 4bee04c8b1f76..f4ff76aedba2e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -46,14 +46,14 @@ class WorkerSourceTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); - private ConnectorTaskId id; - private SourceTask task; + private final ConnectorTaskId id; + private final SourceTask task; private final Converter keyConverter; private final Converter valueConverter; private KafkaProducer producer; private WorkerSourceTaskThread workThread; - private OffsetStorageReader offsetReader; - private OffsetStorageWriter offsetWriter; + private final OffsetStorageReader offsetReader; + private final OffsetStorageWriter offsetWriter; private final WorkerConfig workerConfig; private final Time time; @@ -86,15 +86,12 @@ public WorkerSourceTask(ConnectorTaskId id, SourceTask task, @Override public void start(Properties props) { - task.initialize(new WorkerSourceTaskContext(offsetReader)); - task.start(props); - workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id); + workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props); workThread.start(); } @Override public void stop() { - task.stop(); if (workThread != null) workThread.startGracefulShutdown(); } @@ -111,7 +108,6 @@ public boolean awaitStop(long timeoutMs) { success = false; } } - commitOffsets(); return success; } @@ -279,13 +275,31 @@ private void finishSuccessfulFlush() { private class WorkerSourceTaskThread extends ShutdownableThread { - public WorkerSourceTaskThread(String name) { + private Properties workerProps; + private boolean finishedStart; + private boolean startedShutdownBeforeStartCompleted; + + public WorkerSourceTaskThread(String name, Properties workerProps) { super(name); + this.workerProps = workerProps; + this.finishedStart = false; + this.startedShutdownBeforeStartCompleted = false; } @Override public void execute() { try { + task.initialize(new WorkerSourceTaskContext(offsetReader)); + task.start(workerProps); + log.info("Source task {} finished initialization and start", this); + synchronized (this) { + if (startedShutdownBeforeStartCompleted) { + task.stop(); + return; + } + finishedStart = true; + } + while (getRunning()) { List records = task.poll(); if (records == null) @@ -295,6 +309,19 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } + + commitOffsets(); + } + + @Override + public void startGracefulShutdown() { + super.startGracefulShutdown(); + synchronized (this) { + if (finishedStart) + task.stop(); + else + startedShutdownBeforeStartCompleted = true; + } } } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 28e9e2e603945..177f7a668bdf9 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -136,6 +136,7 @@ public void testPollsInBackground() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); for (int i = 0; i < 10; i++) { workerThread.iteration(); } @@ -202,6 +203,7 @@ public void testCommit() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // First iteration gets one record workerThread.iteration(); // Second triggers commit, gets a second offset @@ -236,6 +238,7 @@ public void testCommitTaskFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); @@ -267,6 +270,7 @@ public void testCommitTaskSuccessAndFlushFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers first commit, third iteration triggers second (failing) commit workerThread.iteration(); workerThread.iteration(); @@ -292,6 +296,7 @@ public void testCommitConsumerFailure() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Second iteration triggers commit workerThread.iteration(); workerThread.iteration(); @@ -318,6 +323,7 @@ public void testCommitTimeout() throws Exception { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't // trigger another commit workerThread.iteration(); @@ -393,6 +399,7 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); workerThread.iteration(); workerThread.iteration(); workerThread.iteration(); @@ -436,6 +443,7 @@ public Object answer() throws Throwable { PowerMock.replayAll(); workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); workerThread.iteration(); workerThread.iteration(); workerTask.stop(); @@ -448,7 +456,17 @@ public Object answer() throws Throwable { private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); + workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, + workerTask, "mock-worker-thread", time, + workerConfig); + PowerMock.expectPrivate(workerTask, "createWorkerThread") + .andReturn(workerThread); + workerThread.start(); + PowerMock.expectLastCall(); + consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); + PowerMock.expectLastCall(); + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -464,14 +482,6 @@ public ConsumerRecords answer() throws Throwable { PowerMock.expectLastCall(); sinkTask.start(TASK_PROPS); PowerMock.expectLastCall(); - - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, - workerTask, "mock-worker-thread", time, - workerConfig); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); } private void expectStopTask(final long expectedMessages) throws Exception { diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java index 566391d217499..452c5cbea0f7d 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; import org.apache.kafka.copycat.source.SourceRecord; @@ -208,6 +209,34 @@ public void testSendRecordsConvertsData() throws Exception { PowerMock.verifyAll(); } + @Test + public void testSlowTaskStart() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + Utils.sleep(100); + return null; + } + }); + sourceTask.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.start(EMPTY_TASK_PROPS); + // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, + // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it + // cannot be invoked immediately in the thread trying to stop the task. + workerTask.stop(); + assertEquals(true, workerTask.awaitStop(1000)); + + PowerMock.verifyAll(); + } private CountDownLatch expectPolls(int count) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(count); From 551c6407f320c5d55362c3e0c0d00f29bdc4b5fc Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 9 Nov 2015 15:43:37 -0800 Subject: [PATCH 6/9] Fix connect:tools build settings. --- build.gradle | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 0f835d563b021..64c0c9be6522c 100644 --- a/build.gradle +++ b/build.gradle @@ -228,7 +228,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] +def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} @@ -348,6 +348,8 @@ project(':core') { from(project(':connect:json').configurations.runtime) { into("libs/") } from(project(':connect:file').jar) { into("libs/") } from(project(':connect:file').configurations.runtime) { into("libs/") } + from(project(':connect:tools').jar) { into("libs/") } + from(project(':connect:tools').configurations.runtime) { into("libs/") } } jar { From 9454aa2deeb01e2dd2f145c60c4a29bcf6d01272 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 10 Nov 2015 13:01:58 -0800 Subject: [PATCH 7/9] Fix leftover copycat reference. --- tests/kafkatest/services/connect.py | 4 ++-- .../{copycat_log4j.properties => connect_log4j.properties} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename tests/kafkatest/services/templates/{copycat_log4j.properties => connect_log4j.properties} (100%) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index a2bbf8c530e0a..d72d53da53e09 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -159,7 +159,7 @@ def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) - node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) remote_connector_configs = [] for idx, template in enumerate(self.connector_config_templates): target_file = os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") @@ -190,7 +190,7 @@ def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) - node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) if self.connector_config_templates: raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API") diff --git a/tests/kafkatest/services/templates/copycat_log4j.properties b/tests/kafkatest/services/templates/connect_log4j.properties similarity index 100% rename from tests/kafkatest/services/templates/copycat_log4j.properties rename to tests/kafkatest/services/templates/connect_log4j.properties From 8faf12ab488d3f4111618781c008bef7386f39a2 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 10 Nov 2015 14:07:12 -0800 Subject: [PATCH 8/9] Replace clean_node implementation with more aggressive implementation that kills all connect processes rather than using pid files. --- tests/kafkatest/services/connect.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index d72d53da53e09..de593ea851d90 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -88,11 +88,7 @@ def restart(self): self.start_node(node) def clean_node(self, node): - if len(self.pids(node)) > 0: - self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % - (self.__class__.__name__, node.account)) - self.stop_node(node, clean_shutdown=False) - + node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False) def config_filenames(self): From cee0ee14733a86112c49eea996f6011289a21d26 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 10 Nov 2015 14:42:14 -0800 Subject: [PATCH 9/9] Fix typo in log4j settings file. --- tests/kafkatest/services/templates/connect_log4j.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/templates/connect_log4j.properties b/tests/kafkatest/services/templates/connect_log4j.properties index 574b8a75ebba4..d62a93d81a83b 100644 --- a/tests/kafkatest/services/templates/connect_log4j.properties +++ b/tests/kafkatest/services/templates/connect_log4j.properties @@ -24,7 +24,7 @@ log4j.appender.FILE.ImmediateFlush=true log4j.appender.FILE.Threshold=debug log4j.appender.FILE.Append=true log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)% +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.I0Itec.zkclient=ERROR