diff --git a/.gitignore b/.gitignore
index b3c646b8b35bf..b1439ccb6d8a1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@ dist
*classes
target/
build/
+build_eclipse/
.gradle/
lib_managed/
src_managed/
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000000000..9be5c58d9e9ac
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,54 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+sudo: required
+dist: trusty
+language: java
+
+env:
+ - _DUCKTAPE_OPTIONS="--subset 0 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 1 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 2 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 3 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 4 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 5 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 6 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 7 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 8 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 9 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 10 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 11 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 12 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 13 --subsets 15"
+ - _DUCKTAPE_OPTIONS="--subset 14 --subsets 15"
+
+jdk:
+ - oraclejdk8
+
+before_install:
+ - gradle wrapper
+
+script:
+ - ./gradlew rat
+ - ./gradlew releaseTarGz && /bin/bash ./tests/docker/run_tests.sh
+
+services:
+ - docker
+
+before_cache:
+ - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
+ - rm -fr $HOME/.gradle/caches/*/plugin-resolution/
+cache:
+ directories:
+ - "$HOME/.m2/repository"
+ - "$HOME/.gradle/caches/"
+ - "$HOME/.gradle/wrapper/"
diff --git a/README.md b/README.md
index c219718c06e29..9c2413bca6838 100644
--- a/README.md
+++ b/README.md
@@ -133,11 +133,26 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl
### Determining if any dependencies could be updated ###
./gradlew dependencyUpdates
-### Running checkstyle on the java code ###
+### Running code quality checks ###
+There are two code quality analysis tools that we regularly run, findbugs and checkstyle.
+
+#### Checkstyle
+Checkstyle enforces a consistent coding style in Kafka.
+You can run checkstyle using:
+
./gradlew checkstyleMain checkstyleTest
-This will most commonly be useful for automated builds where the full resources of the host running the build and tests
-may not be dedicated to Kafka's build.
+The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
+subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails.
+
+#### Findbugs
+Findbugs uses static analysis to look for bugs in the code.
+You can run findbugs using:
+
+ ./gradlew findbugsMain findbugsTest -x test
+
+The findbugs warnings will be found in `reports/findbugs/main.html` and `reports/findbugs/test.html` files in the subproject build
+directories. Currently, findbugs warnings do not cause the build to fail.
### Common build options ###
diff --git a/build.gradle b/build.gradle
index 417383deee7ef..57beebeed2153 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,6 +27,7 @@ buildscript {
classpath "org.ajoberstar:grgit:1.7.0"
classpath 'com.github.ben-manes:gradle-versions-plugin:0.13.0'
classpath 'org.scoverage:gradle-scoverage:2.1.0'
+ classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.4'
}
}
@@ -109,7 +110,8 @@ if (new File('.git').exists()) {
'gradlew.bat',
'**/README.md',
'**/id_rsa',
- '**/id_rsa.pub'
+ '**/id_rsa.pub',
+ 'checkstyle/suppressions.xml'
])
}
}
@@ -124,6 +126,7 @@ subprojects {
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: 'checkstyle'
+ apply plugin: 'findbugs'
sourceCompatibility = 1.7
@@ -272,9 +275,24 @@ subprojects {
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
configProperties = [importControlFile: "$rootDir/checkstyle/import-control.xml"]
+ // version 7.x requires Java 8
+ toolVersion = '6.19'
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
+ findbugs {
+ toolVersion = "3.0.1"
+ excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
+ ignoreFailures = true
+ }
+
+ tasks.withType(FindBugs) {
+ reports {
+ xml.enabled false
+ html.enabled true
+ }
+ }
+
// Ignore core since its a scala project
if (it.path != ':core') {
// NOTE: Gradles Jacoco plugin does not support "offline instrumentation" this means that classes mocked by PowerMock
@@ -805,6 +823,48 @@ project(':streams:examples') {
}
}
+project(':jmh-benchmarks') {
+
+ apply plugin: 'com.github.johnrengelman.shadow'
+
+ shadowJar {
+ baseName = 'kafka-jmh-benchmarks-all'
+ classifier = null
+ version = null
+ }
+
+ dependencies {
+ compile project(':clients')
+ compile project(':streams')
+ compile 'org.openjdk.jmh:jmh-core:1.17.5'
+ compile 'org.openjdk.jmh:jmh-generator-annprocess:1.17.5'
+ compile 'org.openjdk.jmh:jmh-core-benchmarks:1.17.5'
+ }
+
+ jar {
+ manifest {
+ attributes "Main-Class": "org.openjdk.jmh.Main"
+ }
+ }
+
+
+ task jmh(type: JavaExec, dependsOn: [':jmh-benchmarks:clean', ':jmh-benchmarks:shadowJar']) {
+
+ main="-jar"
+
+ doFirst {
+ if (System.getProperty("jmhArgs")) {
+ args System.getProperty("jmhArgs").split(',')
+ }
+ args = [shadowJar.archivePath, *args]
+ }
+ }
+
+ javadoc {
+ enabled = false
+ }
+}
+
project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 25d6f2fd07851..9a4a37f4e5bab 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -24,8 +24,8 @@
-
-
+
+
@@ -93,5 +93,41 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6c72e63c1e249..80747e14b528d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -181,6 +181,15 @@
+
+
+
+
+
+
+
+
+
@@ -212,6 +221,10 @@
+
+
+
+
diff --git a/checkstyle/java.header b/checkstyle/java.header
new file mode 100644
index 0000000000000..45fd2d5c2daf7
--- /dev/null
+++ b/checkstyle/java.header
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
new file mode 100644
index 0000000000000..a39695f934b6a
--- /dev/null
+++ b/checkstyle/suppressions.xml
@@ -0,0 +1,229 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 08b8d465a13db..11119646f4a00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -74,9 +78,8 @@ public ApiKeys apiKey() {
return requestBuilder.apiKey();
}
- public RequestHeader makeHeader() {
- return new RequestHeader(requestBuilder.apiKey().id,
- requestBuilder.version(), clientId, correlationId);
+ public RequestHeader makeHeader(short version) {
+ return new RequestHeader(apiKey().id, version, clientId, correlationId);
}
public AbstractRequest.Builder> requestBuilder() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 2dfbba6467ac8..715eae7f4f4de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 28eb72e1be681..7d19ea40f7c67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 350f5a9f4490b..9bde1a22460f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -32,7 +36,7 @@ public ClusterConnectionStates(long reconnectBackoffMs) {
* Return true iff we can currently initiate a new connection. This will be the case if we are not
* connected and haven't been connected for at least the minimum reconnection backoff period.
* @param id the connection id to check
- * @param now the current time in MS
+ * @param now the current time in ms
* @return true if we can initiate a new connection
*/
public boolean canConnect(String id, long now) {
@@ -132,6 +136,15 @@ public boolean isReady(String id) {
return state != null && state.state == ConnectionState.READY;
}
+ /**
+ * Return true if the connection has been disconnected
+ * @param id The id of the node to check
+ */
+ public boolean isDisconnected(String id) {
+ NodeConnectionState state = nodeState.get(id);
+ return state != null && state.state == ConnectionState.DISCONNECTED;
+ }
+
/**
* Remove the given node from the tracked connection states. The main difference between this and `disconnected`
* is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken
@@ -151,7 +164,7 @@ public void remove(String id) {
public ConnectionState connectionState(String id) {
return nodeState(id).state;
}
-
+
/**
* Get the state of a given node.
* @param id the connection to fetch the state for
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 944b09d5720df..5006ee2dafae7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.kafka.clients;
import org.apache.kafka.common.protocol.SecurityProtocol;
diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
index 18e7e182f559f..62ffada5d7836 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f4f753e495b6a..a29075df4df4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -51,7 +55,7 @@ public void add(NetworkClient.InFlightRequest request) {
private Deque requestQueue(String node) {
Deque reqs = requests.get(node);
if (reqs == null || reqs.isEmpty())
- throw new IllegalStateException("Response from server for which there are no in-flight requests.");
+ throw new IllegalStateException("There are no in-flight requests for node " + node);
return reqs;
}
@@ -92,25 +96,44 @@ public boolean canSendMore(String node) {
}
/**
- * Return the number of inflight requests directed at the given node
+ * Return the number of in-flight requests directed at the given node
* @param node The node
* @return The request count.
*/
- public int inFlightRequestCount(String node) {
+ public int count(String node) {
Deque queue = requests.get(node);
return queue == null ? 0 : queue.size();
}
+ /**
+ * Return true if there is no in-flight request directed at the given node and false otherwise
+ */
+ public boolean isEmpty(String node) {
+ Deque queue = requests.get(node);
+ return queue != null && !queue.isEmpty();
+ }
+
/**
* Count all in-flight requests for all nodes
*/
- public int inFlightRequestCount() {
+ public int count() {
int total = 0;
for (Deque deque : this.requests.values())
total += deque.size();
return total;
}
+ /**
+ * Return true if there is no in-flight request and false otherwise
+ */
+ public boolean isEmpty() {
+ for (Deque deque : this.requests.values()) {
+ if (!deque.isEmpty())
+ return false;
+ }
+ return false;
+ }
+
/**
* Clear out all the in-flight requests for the given node and return them
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index f51d1f5131a0f..83a0009113d78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -103,6 +107,11 @@ public interface KafkaClient extends Closeable {
*/
int inFlightRequestCount();
+ /**
+ * Return true if there is at least one in-flight request and false otherwise.
+ */
+ boolean hasInFlightRequests();
+
/**
* Get the total in-flight requests for a particular node
*
@@ -110,6 +119,11 @@ public interface KafkaClient extends Closeable {
*/
int inFlightRequestCount(String nodeId);
+ /**
+ * Return true if there is at least one in-flight request for a particular node and false otherwise.
+ */
+ boolean hasInFlightRequests(String nodeId);
+
/**
* Wake up the client if it is currently blocked waiting for I/O
*/
@@ -135,7 +149,7 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder> request
* @param expectResponse true iff we expect a response
* @param callback the callback to invoke when we get a response
*/
- ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder> requestBuilder,
- long createdTimeMs, boolean expectResponse,
- RequestCompletionHandler callback);
+ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder> requestBuilder, long createdTimeMs,
+ boolean expectResponse, RequestCompletionHandler callback);
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 6486d1528d26b..ed149fb71059d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 75d48abf247ac..5bfdb642f9c6a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -195,8 +199,13 @@ public synchronized boolean containsTopic(String topic) {
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
+ *
+ * @param cluster the cluster containing metadata for topics with valid metadata
+ * @param unavailableTopics topics which are non-existent or have one or more partitions whose
+ * leader is not known
+ * @param now current time in milliseconds
*/
- public synchronized void update(Cluster cluster, long now) {
+ public synchronized void update(Cluster cluster, Set unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
@@ -219,7 +228,7 @@ else if (expireMs <= now) {
}
for (Listener listener: listeners)
- listener.onMetadataUpdate(cluster);
+ listener.onMetadataUpdate(cluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
@@ -302,7 +311,14 @@ public synchronized void removeListener(Listener listener) {
* MetadataUpdate Listener
*/
public interface Listener {
- void onMetadataUpdate(Cluster cluster);
+ /**
+ * Callback invoked on metadata update.
+ *
+ * @param cluster the cluster containing metadata for topics with valid metadata
+ * @param unavailableTopics topics which are non-existent or have one or more partitions whose
+ * leader is not known
+ */
+ void onMetadataUpdate(Cluster cluster, Set unavailableTopics);
}
private synchronized void requestUpdateForNewTopics() {
@@ -325,7 +341,7 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
for (String topic : this.topics.keySet()) {
List partitionInfoList = cluster.partitionsForTopic(topic);
- if (partitionInfoList != null) {
+ if (!partitionInfoList.isEmpty()) {
partitionInfos.addAll(partitionInfoList);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 34bdbf6cbbcfc..55901b5df0144 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 890bf56d1293e..b6f8b0ed4fc0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
@@ -20,7 +24,6 @@
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@@ -226,7 +229,7 @@ public long connectionDelay(Node node, long now) {
*/
@Override
public boolean connectionFailed(Node node) {
- return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
+ return connectionStates.isDisconnected(node.idString());
}
/**
@@ -280,43 +283,46 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
- AbstractRequest request = null;
AbstractRequest.Builder> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
+ short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
+ version = builder.desiredOrLatestVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending message of type {} to node {}. " +
- "Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
+ "Assuming version {}.", clientRequest.apiKey(), nodeId, version);
} else {
- short version = versionInfo.usableVersion(clientRequest.apiKey());
- builder.setVersion(version);
+ version = versionInfo.usableVersion(clientRequest.apiKey());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
- request = builder.build();
+ doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException e) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} to {}",
clientRequest.toString(), clientRequest.destination(), e);
- ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
+ ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
- return;
}
- RequestHeader header = clientRequest.makeHeader();
+ }
+
+ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
+ String nodeId = clientRequest.destination();
+ RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
- int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id);
+ int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
- log.trace("Sending {} to node {}.", request, nodeId);
+ log.trace("Sending {} {} to node {}.", clientRequest.apiKey(), request, nodeId);
} else {
- log.debug("Using older server API v{} to send {} to node {}.",
- header.apiVersion(), request, nodeId);
+ log.debug("Using older server API v{} to send {} {} to node {}.",
+ header.apiVersion(), clientRequest.apiKey(), request, nodeId);
}
}
Send send = request.toSend(nodeId, header);
@@ -327,6 +333,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
+ request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
@@ -379,7 +386,12 @@ public List poll(long timeout, long now) {
*/
@Override
public int inFlightRequestCount() {
- return this.inFlightRequests.inFlightRequestCount();
+ return this.inFlightRequests.count();
+ }
+
+ @Override
+ public boolean hasInFlightRequests() {
+ return !this.inFlightRequests.isEmpty();
}
/**
@@ -387,7 +399,12 @@ public int inFlightRequestCount() {
*/
@Override
public int inFlightRequestCount(String node) {
- return this.inFlightRequests.inFlightRequestCount(node);
+ return this.inFlightRequests.count(node);
+ }
+
+ @Override
+ public boolean hasInFlightRequests(String node) {
+ return this.inFlightRequests.isEmpty(node);
}
/**
@@ -424,7 +441,7 @@ public Node leastLoadedNode(long now) {
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
- int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
+ int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0 && this.connectionStates.isReady(node.idString())) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
@@ -450,9 +467,8 @@ public Node leastLoadedNode(long now) {
public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
// Always expect the response version id to be the same as the request version id
- short apiKey = requestHeader.apiKey();
- short apiVer = requestHeader.apiVersion();
- Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
+ ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+ Struct responseBody = apiKey.responseSchema(requestHeader.apiVersion()).read(responseBuffer);
correlate(requestHeader, responseHeader);
return AbstractResponse.getResponse(apiKey, responseBody);
}
@@ -469,7 +485,7 @@ private void processDisconnection(List responses, String nodeId,
nodeApiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
- log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
+ log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId);
if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
metadataUpdater.handleDisconnection(request.destination);
else
@@ -709,7 +725,7 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
- this.metadata.update(cluster, now);
+ this.metadata.update(cluster, response.unavailableTopics(), now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
@@ -795,6 +811,7 @@ static class InFlightRequest {
final String destination;
final RequestCompletionHandler callback;
final boolean expectResponse;
+ final AbstractRequest request;
final boolean isInternalRequest; // used to flag requests which are initiated internally by NetworkClient
final Send send;
final long sendTimeMs;
@@ -806,6 +823,7 @@ public InFlightRequest(RequestHeader header,
RequestCompletionHandler callback,
boolean expectResponse,
boolean isInternalRequest,
+ AbstractRequest request,
Send send,
long sendTimeMs) {
this.header = header;
@@ -813,6 +831,7 @@ public InFlightRequest(RequestHeader header,
this.callback = callback;
this.expectResponse = expectResponse;
this.isInternalRequest = isInternalRequest;
+ this.request = request;
this.send = send;
this.sendTimeMs = sendTimeMs;
this.createdTimeMs = createdTimeMs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index aa9c85a5664ce..f216c60d1401b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -1,20 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
import org.apache.kafka.common.utils.Utils;
@@ -31,7 +34,6 @@
* An internal class which represents the API versions supported by a particular node.
*/
public class NodeApiVersions {
- private static final Short API_NOT_ON_NODE = null;
private static final short NODE_TOO_OLD = (short) -1;
private static final short NODE_TOO_NEW = (short) -2;
private final Collection nodeApiVersions;
@@ -47,7 +49,7 @@ public class NodeApiVersions {
* @return A new NodeApiVersions object.
*/
public static NodeApiVersions create() {
- return create(Collections.EMPTY_LIST);
+ return create(Collections.emptyList());
}
/**
@@ -68,8 +70,7 @@ public static NodeApiVersions create(Collection overrides) {
}
}
if (!exists) {
- apiVersions.add(new ApiVersion(apiKey.id, ProtoUtils.oldestVersion(apiKey.id),
- ProtoUtils.latestVersion(apiKey.id)));
+ apiVersions.add(new ApiVersion(apiKey));
}
}
return new NodeApiVersions(apiVersions);
@@ -78,16 +79,16 @@ public static NodeApiVersions create(Collection overrides) {
public NodeApiVersions(Collection nodeApiVersions) {
this.nodeApiVersions = nodeApiVersions;
for (ApiVersion nodeApiVersion : nodeApiVersions) {
- int nodeApiKey = nodeApiVersion.apiKey;
// Newer brokers may support ApiKeys we don't know about, ignore them
- if (ApiKeys.hasId(nodeApiKey)) {
- short v = Utils.min(ProtoUtils.latestVersion(nodeApiKey), nodeApiVersion.maxVersion);
+ if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
+ ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
+ short v = Utils.min(nodeApiKey.latestVersion(), nodeApiVersion.maxVersion);
if (v < nodeApiVersion.minVersion) {
- usableVersions.put(ApiKeys.forId(nodeApiKey), NODE_TOO_NEW);
- } else if (v < ProtoUtils.oldestVersion(nodeApiKey)) {
- usableVersions.put(ApiKeys.forId(nodeApiKey), NODE_TOO_OLD);
+ usableVersions.put(nodeApiKey, NODE_TOO_NEW);
+ } else if (v < nodeApiKey.oldestVersion()) {
+ usableVersions.put(nodeApiKey, NODE_TOO_OLD);
} else {
- usableVersions.put(ApiKeys.forId(nodeApiKey), v);
+ usableVersions.put(nodeApiKey, v);
}
}
}
@@ -98,14 +99,14 @@ public NodeApiVersions(Collection nodeApiVersions) {
*/
public short usableVersion(ApiKeys apiKey) {
Short usableVersion = usableVersions.get(apiKey);
- if (usableVersion == API_NOT_ON_NODE)
+ if (usableVersion == null)
throw new UnsupportedVersionException("The broker does not support " + apiKey);
else if (usableVersion == NODE_TOO_OLD)
throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
- " version " + ProtoUtils.oldestVersion(apiKey.id));
+ " version " + apiKey.oldestVersion());
else if (usableVersion == NODE_TOO_NEW)
throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
- " version " + ProtoUtils.latestVersion(apiKey.id));
+ " version " + apiKey.latestVersion());
else
return usableVersion;
}
@@ -160,17 +161,17 @@ private String apiVersionToText(ApiVersion apiVersion) {
ApiKeys apiKey = null;
if (ApiKeys.hasId(apiVersion.apiKey)) {
apiKey = ApiKeys.forId(apiVersion.apiKey);
- }
- if (apiKey != null) {
bld.append(apiKey.name).append("(").append(apiKey.id).append("): ");
} else {
- bld.append("UNKNOWN(").append(apiKey.id).append("): ");
+ bld.append("UNKNOWN(").append(apiVersion.apiKey).append("): ");
}
+
if (apiVersion.minVersion == apiVersion.maxVersion) {
bld.append(apiVersion.minVersion);
} else {
bld.append(apiVersion.minVersion).append(" to ").append(apiVersion.maxVersion);
}
+
if (apiKey != null) {
Short usableVersion = usableVersions.get(apiKey);
if (usableVersion == NODE_TOO_OLD)
diff --git a/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java b/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
index 6fee4e45986d2..4e08ddf328c3e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index 5695be83d6cbc..c6006b7ea80d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index cdcab5d4ce4b9..b1badefd31802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ed809a9f401ad..26a7d5daffc7f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
@@ -426,6 +430,10 @@ public static Properties addDeserializerToConfig(Properties properties,
super(CONFIG, props);
}
+ ConsumerConfig(Map, ?> props, boolean doLog) {
+ super(CONFIG, props, doLog);
+ }
+
public static Set configNames() {
return CONFIG.names();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index f8789fc57e9be..2f4e310f7025e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index a4265abb76eda..3a3873a83d1ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 5f1015512285c..26c37680bed96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 5b83f0c677c66..f2dc9bbc2ba31 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
index 5f8a57faad31c..b23ca867359ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 449efc9df180c..612f446ab9d2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
@@ -630,7 +634,7 @@ private KafkaConsumer(ConsumerConfig config,
// load interceptors and make sure they get clientId
Map userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
- List> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ List> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
@@ -652,7 +656,7 @@ private KafkaConsumer(ConsumerConfig config,
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), 0);
+ this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(
@@ -1000,9 +1004,8 @@ public ConsumerRecords poll(long timeout) {
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
- if (fetcher.sendFetches() > 0) {
+ if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
- }
if (this.interceptors == null)
return new ConsumerRecords<>(records);
@@ -1339,7 +1342,7 @@ public List partitionsFor(String topic) {
try {
Cluster cluster = this.metadata.fetch();
List parts = cluster.partitionsForTopic(topic);
- if (parts != null)
+ if (!parts.isEmpty())
return parts;
Map> topicMetadata = fetcher.getTopicMetadata(
@@ -1495,7 +1498,7 @@ public Map endOffsets(Collection partition
* If auto-commit is enabled, this will commit the current offsets if possible within the default
* timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()}
* cannot be used to interrupt close.
- *
+ *
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
* before or while this function is called
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a88f4324ccb2d..d81270adf650a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
@@ -152,17 +156,24 @@ public ConsumerRecords poll(long timeout) {
updateFetchPosition(tp);
// update the consumed offset
+ final Map>> results = new HashMap<>();
+ for (final TopicPartition topicPartition : records.keySet()) {
+ results.put(topicPartition, new ArrayList>());
+ }
+
for (Map.Entry>> entry : this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
- List> recs = entry.getValue();
- if (!recs.isEmpty())
- this.subscriptions.position(entry.getKey(), recs.get(recs.size() - 1).offset() + 1);
+ final List> recs = entry.getValue();
+ for (final ConsumerRecord rec : recs) {
+ if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
+ results.get(entry.getKey()).add(rec);
+ subscriptions.position(entry.getKey(), rec.offset() + 1);
+ }
+ }
}
}
-
- ConsumerRecords copy = new ConsumerRecords(this.records);
- this.records = new HashMap>>();
- return copy;
+ this.records.clear();
+ return new ConsumerRecords<>(results);
}
public void addRecord(ConsumerRecord record) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 70fba3690b7e9..14bb71003379b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.TopicPartition;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index df8bf37adbada..9d06f29721ffd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
index 2f95291112ac2..3af057f9ce4ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index 918087d4201b7..2fef79ee2bbdb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
index 3dd92fb0e74af..dae19b29f2d19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
index 542da7f7bf5c8..6d742b850a134 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 16c1d77c429a7..ec6c62f1e834e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -1,13 +1,13 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 1c1a2f513cc56..33cba6fbc8d66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index a598258e339a6..8e38b84ccef88 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 350a84bf02268..db665b6788f2f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
@@ -315,6 +319,19 @@ private synchronized void disableHeartbeatThread() {
heartbeatThread.disable();
}
+ private void closeHeartbeatThread() {
+ if (heartbeatThread != null) {
+ heartbeatThread.close();
+
+ try {
+ heartbeatThread.join();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for consumer heartbeat thread to close");
+ throw new InterruptException(e);
+ }
+ }
+ }
+
// visible for testing. Joins the group without starting the heartbeat thread.
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
@@ -648,23 +665,30 @@ protected synchronized void requestRejoin() {
* Close the coordinator, waiting if needed to send LeaveGroup.
*/
@Override
- public synchronized void close() {
+ public final void close() {
close(0);
}
- protected synchronized void close(long timeoutMs) {
- if (heartbeatThread != null)
- heartbeatThread.close();
- maybeLeaveGroup();
-
- // At this point, there may be pending commits (async commits or sync commits that were
- // interrupted using wakeup) and the leave group request which have been queued, but not
- // yet sent to the broker. Wait up to close timeout for these pending requests to be processed.
- // If coordinator is not known, requests are aborted.
- Node coordinator = coordinator();
- if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs))
- log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.",
- client.pendingRequestCount(coordinator), groupId);
+ protected void close(long timeoutMs) {
+ try {
+ closeHeartbeatThread();
+ } finally {
+
+ // Synchronize after closing the heartbeat thread since heartbeat thread
+ // needs this lock to complete and terminate after close flag is set.
+ synchronized (this) {
+ maybeLeaveGroup();
+
+ // At this point, there may be pending commits (async commits or sync commits that were
+ // interrupted using wakeup) and the leave group request which have been queued, but not
+ // yet sent to the broker. Wait up to close timeout for these pending requests to be processed.
+ // If coordinator is not known, requests are aborted.
+ Node coordinator = coordinator();
+ if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs))
+ log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.",
+ client.pendingRequestCount(coordinator), groupId);
+ }
+ }
}
/**
@@ -888,9 +912,9 @@ public void run() {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture == null)
- lookupCoordinator();
- else
+ if (findCoordinatorFuture != null || lookupCoordinator().failed())
+ // the immediate future check ensures that we backoff properly in the case that no
+ // brokers are available to connect to.
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
@@ -941,7 +965,7 @@ public void onFailure(RuntimeException e) {
log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
this.failed.set(new RuntimeException(e));
} catch (RuntimeException e) {
- log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+ log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
this.failed.set(e);
} finally {
log.debug("Heartbeat thread for group {} has closed", groupId);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 4f90e66f2794c..5c97693e91fc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 0dc073e21f6e4..ba19146ae0436 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1,14 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
@@ -49,6 +53,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -168,7 +173,7 @@ public void updatePatternSubscription(Cluster cluster) {
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
- public void onMetadataUpdate(Cluster cluster) {
+ public void onMetadataUpdate(Cluster cluster, Set unavailableTopics) {
// if we encounter any unauthorized topics, raise an exception to the user
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
@@ -182,6 +187,9 @@ public void onMetadataUpdate(Cluster cluster) {
if (!snapshot.equals(metadataSnapshot))
metadataSnapshot = snapshot;
}
+
+ if (!Collections.disjoint(metadata.topics(), unavailableTopics))
+ metadata.requestUpdate();
}
});
}
@@ -697,11 +705,10 @@ private RequestFuture sendOffsetCommitRequest(final Map> unsent = new HashMap<>();
+ private final UnsentRequests unsent = new UnsentRequests();
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
@@ -95,24 +101,13 @@ public RequestFuture send(Node node, AbstractRequest.Builder>
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
completionHandler);
- put(node, clientRequest);
+ unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}
- private void put(Node node, ClientRequest request) {
- synchronized (this) {
- List nodeUnsent = unsent.get(node);
- if (nodeUnsent == null) {
- nodeUnsent = new ArrayList<>();
- unsent.put(node, nodeUnsent);
- }
- nodeUnsent.add(request);
- }
- }
-
public Node leastLoadedNode() {
synchronized (this) {
return client.leastLoadedNode(time.milliseconds());
@@ -247,6 +242,9 @@ public void poll(long timeout, long now, PollCondition pollCondition) {
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
+
+ // clean unsent requests collection to keep the map from growing indefinitely
+ unsent.clean();
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
@@ -276,12 +274,12 @@ public boolean awaitPendingRequests(Node node, long timeoutMs) {
long startMs = time.milliseconds();
long remainingMs = timeoutMs;
- while (pendingRequestCount(node) > 0 && remainingMs > 0) {
+ while (hasPendingRequests(node) && remainingMs > 0) {
poll(remainingMs);
remainingMs = timeoutMs - (time.milliseconds() - startMs);
}
- return pendingRequestCount(node) == 0;
+ return !hasPendingRequests(node);
}
/**
@@ -292,9 +290,21 @@ public boolean awaitPendingRequests(Node node, long timeoutMs) {
*/
public int pendingRequestCount(Node node) {
synchronized (this) {
- List pending = unsent.get(node);
- int unsentCount = pending == null ? 0 : pending.size();
- return unsentCount + client.inFlightRequestCount(node.idString());
+ return unsent.requestCount(node) + client.inFlightRequestCount(node.idString());
+ }
+ }
+
+ /**
+ * Check whether there is pending request to the given node. This includes both request that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @param node The node in question
+ * @return A boolean indicating whether there is pending request
+ */
+ public boolean hasPendingRequests(Node node) {
+ if (unsent.hasRequests(node))
+ return true;
+ synchronized (this) {
+ return client.hasInFlightRequests(node.idString());
}
}
@@ -305,10 +315,20 @@ public int pendingRequestCount(Node node) {
*/
public int pendingRequestCount() {
synchronized (this) {
- int total = 0;
- for (List requests: unsent.values())
- total += requests.size();
- return total + client.inFlightRequestCount();
+ return unsent.requestCount() + client.inFlightRequestCount();
+ }
+ }
+
+ /**
+ * Check whether there is pending request. This includes both requests that
+ * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+ * @return A boolean indicating whether there is pending request
+ */
+ public boolean hasPendingRequests() {
+ if (unsent.hasRequests())
+ return true;
+ synchronized (this) {
+ return client.hasInFlightRequests();
}
}
@@ -333,18 +353,16 @@ private void checkDisconnects(long now) {
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
- Iterator>> iterator = unsent.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry> requestEntry = iterator.next();
- Node node = requestEntry.getKey();
+ for (Node node : unsent.nodes()) {
if (client.connectionFailed(node)) {
// Remove entry before invoking request callback to avoid callbacks handling
// coordinator failures traversing the unsent list again.
- iterator.remove();
- for (ClientRequest request : requestEntry.getValue()) {
+ Collection requests = unsent.remove(node);
+ for (ClientRequest request : requests) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
- handler.onComplete(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
- request.createdTimeMs(), now, true, null, null));
+ handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
+ request.callback(), request.destination(), request.createdTimeMs(), now, true,
+ null, null));
}
}
}
@@ -352,33 +370,20 @@ private void checkDisconnects(long now) {
private void failExpiredRequests(long now) {
// clear all expired unsent requests and fail their corresponding futures
- Iterator>> iterator = unsent.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry> requestEntry = iterator.next();
- Iterator requestIterator = requestEntry.getValue().iterator();
- while (requestIterator.hasNext()) {
- ClientRequest request = requestIterator.next();
- if (request.createdTimeMs() < now - unsentExpiryMs) {
- RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
- handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
- requestIterator.remove();
- } else
- break;
- }
- if (requestEntry.getValue().isEmpty())
- iterator.remove();
+ Collection expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
+ for (ClientRequest request : expiredRequests) {
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+ handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
}
}
public void failUnsentRequests(Node node, RuntimeException e) {
// clear unsent requests to node and fail their corresponding futures
synchronized (this) {
- List unsentRequests = unsent.remove(node);
- if (unsentRequests != null) {
- for (ClientRequest unsentRequest : unsentRequests) {
- RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
- handler.onFailure(e);
- }
+ Collection unsentRequests = unsent.remove(node);
+ for (ClientRequest unsentRequest : unsentRequests) {
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback();
+ handler.onFailure(e);
}
}
@@ -389,9 +394,9 @@ public void failUnsentRequests(Node node, RuntimeException e) {
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
- for (Map.Entry> requestEntry: unsent.entrySet()) {
- Node node = requestEntry.getKey();
- Iterator iterator = requestEntry.getValue().iterator();
+
+ for (Node node : unsent.nodes()) {
+ Iterator iterator = unsent.requestIterator(node);
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
@@ -468,12 +473,12 @@ public void tryConnect(Node node) {
}
}
- public class RequestFutureCompletionHandler implements RequestCompletionHandler {
+ private class RequestFutureCompletionHandler implements RequestCompletionHandler {
private final RequestFuture future;
private ClientResponse response;
private RuntimeException e;
- public RequestFutureCompletionHandler() {
+ private RequestFutureCompletionHandler() {
this.future = new RequestFuture<>();
}
@@ -522,4 +527,98 @@ public interface PollCondition {
boolean shouldBlock();
}
+ /*
+ * A threadsafe helper class to hold requests per node that have not been sent yet
+ */
+ private final static class UnsentRequests {
+ private final ConcurrentMap> unsent;
+
+ private UnsentRequests() {
+ unsent = new ConcurrentHashMap<>();
+ }
+
+ public void put(Node node, ClientRequest request) {
+ // the lock protects the put from a concurrent removal of the queue for the node
+ synchronized (unsent) {
+ ConcurrentLinkedQueue requests = unsent.get(node);
+ if (requests == null) {
+ requests = new ConcurrentLinkedQueue<>();
+ unsent.put(node, requests);
+ }
+ requests.add(request);
+ }
+ }
+
+ public int requestCount(Node node) {
+ ConcurrentLinkedQueue requests = unsent.get(node);
+ return requests == null ? 0 : requests.size();
+ }
+
+ public int requestCount() {
+ int total = 0;
+ for (ConcurrentLinkedQueue requests : unsent.values())
+ total += requests.size();
+ return total;
+ }
+
+ public boolean hasRequests(Node node) {
+ ConcurrentLinkedQueue requests = unsent.get(node);
+ return requests != null && !requests.isEmpty();
+ }
+
+ public boolean hasRequests() {
+ for (ConcurrentLinkedQueue requests : unsent.values())
+ if (!requests.isEmpty())
+ return true;
+ return false;
+ }
+
+ public Collection removeExpiredRequests(long now, long unsentExpiryMs) {
+ List expiredRequests = new ArrayList<>();
+ for (ConcurrentLinkedQueue requests : unsent.values()) {
+ Iterator requestIterator = requests.iterator();
+ while (requestIterator.hasNext()) {
+ ClientRequest request = requestIterator.next();
+ if (request.createdTimeMs() < now - unsentExpiryMs) {
+ expiredRequests.add(request);
+ requestIterator.remove();
+ } else
+ break;
+ }
+ }
+ return expiredRequests;
+ }
+
+ public void clean() {
+ // the lock protects removal from a concurrent put which could otherwise mutate the
+ // queue after it has been removed from the map
+ synchronized (unsent) {
+ Iterator> iterator = unsent.values().iterator();
+ while (iterator.hasNext()) {
+ ConcurrentLinkedQueue requests = iterator.next();
+ if (requests.isEmpty())
+ iterator.remove();
+ }
+ }
+ }
+
+ public Collection remove(Node node) {
+ // the lock protects removal from a concurrent put which could otherwise mutate the
+ // queue after it has been removed from the map
+ synchronized (unsent) {
+ ConcurrentLinkedQueue requests = unsent.remove(node);
+ return requests == null ? Collections.emptyList() : requests;
+ }
+ }
+
+ public Iterator requestIterator(Node node) {
+ ConcurrentLinkedQueue requests = unsent.get(node);
+ return requests == null ? Collections.emptyIterator() : requests.iterator();
+ }
+
+ public Collection nodes() {
+ return unsent.keySet();
+ }
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 361865d0082fc..392e27289d206 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 02f34e5ab6bc0..441206aa5575f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
@@ -200,11 +203,11 @@ public void onSuccess(ClientResponse resp) {
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
- request.version()));
+ resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
- sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
+ sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
}
@Override
@@ -603,13 +606,10 @@ public void onFailure(RuntimeException e) {
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture