diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b18a9cfbaddc5..9962e377fed5f 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -72,7 +72,7 @@ do
CLASSPATH=$CLASSPATH:$dir/*
done
-for cc_pkg in "api" "runtime" "file" "json"
+for cc_pkg in "api" "runtime" "file" "json" "tools"
do
for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
diff --git a/build.gradle b/build.gradle
index 7f21a00b9b4ba..70fdbcd8d0899 100644
--- a/build.gradle
+++ b/build.gradle
@@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@@ -350,6 +350,8 @@ project(':core') {
from(project(':connect:json').configurations.runtime) { into("libs/") }
from(project(':connect:file').jar) { into("libs/") }
from(project(':connect:file').configurations.runtime) { into("libs/") }
+ from(project(':connect:tools').jar) { into("libs/") }
+ from(project(':connect:tools').configurations.runtime) { into("libs/") }
}
jar {
@@ -887,3 +889,64 @@ project(':connect:file') {
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
+
+project(':connect:tools') {
+ apply plugin: 'checkstyle'
+ archivesBaseName = "connect-tools"
+
+ dependencies {
+ compile project(':connect:api')
+ compile "$slf4japi"
+ compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
+
+ testCompile "$junit"
+ testCompile "$easymock"
+ testCompile "$powermock"
+ testCompile "$powermock_easymock"
+ testRuntime "$slf4jlog4j"
+ }
+
+ task testJar(type: Jar) {
+ classifier = 'test'
+ from sourceSets.test.output
+ }
+
+ test {
+ testLogging {
+ events "passed", "skipped", "failed"
+ exceptionFormat = 'full'
+ }
+ }
+
+ javadoc {
+ include "**/org/apache/kafka/connect/*"
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.testRuntime) {
+ include('slf4j-log4j12*')
+ }
+ from (configurations.runtime) {
+ exclude('kafka-clients*')
+ exclude('connect-*')
+ }
+ into "$buildDir/dependant-libs"
+ }
+
+ jar {
+ dependsOn copyDependantLibs
+ }
+
+ artifacts {
+ archives testJar
+ }
+
+ configurations {
+ archives.extendsFrom(testCompile)
+ }
+
+ checkstyle {
+ configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ }
+ test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 908fd351d792c..16a370092f3e0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -205,6 +205,10 @@
+
+
+
+
diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
similarity index 78%
rename from tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
rename to clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
index d8deb220066b4..1c63ffb08c58c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.tools;
+package org.apache.kafka.common.utils;
/**
@@ -46,6 +46,7 @@ public class ThroughputThrottler {
long sleepDeficitNs = 0;
long targetThroughput = -1;
long startMs;
+ private boolean wakeup = false;
/**
* @param targetThroughput Can be messages/sec or bytes/sec
@@ -83,7 +84,11 @@ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
public void throttle() {
if (targetThroughput == 0) {
try {
- Thread.sleep(Long.MAX_VALUE);
+ synchronized (this) {
+ while (!wakeup) {
+ this.wait();
+ }
+ }
} catch (InterruptedException e) {
// do nothing
}
@@ -96,12 +101,21 @@ public void throttle() {
// If enough sleep deficit has accumulated, sleep a little
if (sleepDeficitNs >= MIN_SLEEP_NS) {
- long sleepMs = sleepDeficitNs / 1000000;
- long sleepNs = sleepDeficitNs - sleepMs * 1000000;
-
long sleepStartNs = System.nanoTime();
+ long currentTimeNs = sleepStartNs;
try {
- Thread.sleep(sleepMs, (int) sleepNs);
+ synchronized (this) {
+ long elapsed = currentTimeNs - sleepStartNs;
+ long remaining = sleepDeficitNs - elapsed;
+ while (!wakeup && remaining > 0) {
+ long sleepMs = remaining / 1000000;
+ long sleepNs = remaining - sleepMs * 1000000;
+ this.wait(sleepMs, (int) sleepNs);
+ elapsed = System.nanoTime() - sleepStartNs;
+ remaining = sleepDeficitNs - elapsed;
+ }
+ wakeup = false;
+ }
sleepDeficitNs = 0;
} catch (InterruptedException e) {
// If sleep is cut short, reduce deficit by the amount of
@@ -113,5 +127,15 @@ public void throttle() {
}
}
}
+
+ /**
+ * Wakeup the throttler if its sleeping.
+ */
+ public void wakeup() {
+ synchronized (this) {
+ wakeup = true;
+ this.notifyAll();
+ }
+ }
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index 1890062480758..b2b29bf70868e 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -56,6 +56,11 @@ public SourceRecord(Map sourcePartition, Map sourceOffset,
this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
}
+ public SourceRecord(Map sourcePartition, Map sourceOffset,
+ String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+ this(sourcePartition, sourceOffset, topic, null, keySchema, key, valueSchema, value);
+ }
+
public SourceRecord(Map sourcePartition, Map sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key, Schema valueSchema, Object value) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index e0a3e04b639f0..686e56434e00e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -230,6 +230,13 @@ public WorkerConfig workerConfig() {
return workerConfig;
}
+ @Override
+ public String toString() {
+ return "WorkerSinkTask{" +
+ "id=" + id +
+ '}';
+ }
+
private KafkaConsumer createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
index e776f083e0ba9..b65efa86da951 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
@@ -80,7 +80,7 @@ public void iteration() {
long commitTimeout = commitStarted + task.workerConfig().getLong(
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
if (committing && now >= commitTimeout) {
- log.warn("Commit of {} offsets timed out", this);
+ log.warn("Commit of {} offsets timed out", task);
commitFailures++;
committing = false;
}
@@ -98,11 +98,11 @@ public void onCommitCompleted(Throwable error, long seqno) {
seqno, commitSeqno);
} else {
if (error != null) {
- log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
+ log.error("Commit of {} offsets threw an unexpected exception: ", task, error);
commitFailures++;
} else {
log.debug("Finished {} offset commit successfully in {} ms",
- this, task.time().milliseconds() - commitStarted);
+ task, task.time().milliseconds() - commitStarted);
commitFailures = 0;
}
committing = false;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6cf1dd716a21c..5d0b7e7bec8f3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -178,6 +178,8 @@ private synchronized void recordSent(final ProducerRecord record
public boolean commitOffsets() {
long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+ log.debug("{} Committing offsets", this);
+
long started = time.milliseconds();
long timeout = started + commitTimeoutMs;
@@ -259,7 +261,7 @@ public void onCompletion(Throwable error, Void result) {
}
finishSuccessfulFlush();
- log.debug("Finished {} commitOffsets successfully in {} ms",
+ log.info("Finished {} commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
return true;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 96346ad46105d..a544fb00152c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -133,6 +133,8 @@ public void start(Herder herder) {
}
public void stop() {
+ log.info("Stopping REST server");
+
try {
jettyServer.stop();
jettyServer.join();
@@ -141,6 +143,8 @@ public void stop() {
} finally {
jettyServer.destroy();
}
+
+ log.info("REST server stopped");
}
/**
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index cea436047b99e..c95b7237c2517 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -27,6 +27,8 @@
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
@@ -51,6 +53,8 @@
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class ConnectorsResource {
+ private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
+
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
@@ -159,7 +163,9 @@ private T completeOrForwardRequest(
} catch (ExecutionException e) {
if (e.getCause() instanceof NotLeaderException) {
NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
- return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
+ String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path);
+ log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body);
+ return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType));
}
throw e.getCause();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 3b37076cdcf40..c82645cd936bc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -189,6 +189,7 @@ public void stop() {
* @param callback the callback to invoke once the end of the log has been reached.
*/
public void readToEnd(Callback callback) {
+ log.trace("Starting read to end log for topic {}", topic);
producer.flush();
synchronized (this) {
readLogEndOffsetCallbacks.add(callback);
@@ -286,6 +287,10 @@ private void readToLogEnd() {
private class WorkThread extends Thread {
+ public WorkThread() {
+ super("KafkaBasedLog Work Thread - " + topic);
+ }
+
@Override
public void run() {
try {
@@ -300,6 +305,7 @@ public void run() {
if (numCallbacks > 0) {
try {
readToLogEnd();
+ log.trace("Finished read to end log for topic {}", topic);
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
new file mode 100644
index 0000000000000..0ab64fdfa2602
--- /dev/null
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see VerifiableSinkTask
+ */
+public class VerifiableSinkConnector extends SourceConnector {
+ private Map config;
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ @Override
+ public void start(Map props) {
+ this.config = props;
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return VerifiableSinkTask.class;
+ }
+
+ @Override
+ public List