diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6a263ccc63eb8..ac9aed97e18c8 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -106,7 +106,7 @@
-
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bc2a4333d2040..15f6cac9e450f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -73,6 +73,8 @@
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -409,7 +411,7 @@ private static HostInfo parseHostInfo(final String endPoint) {
private void checkBrokerVersionCompatibility() throws StreamsException {
final StreamsKafkaClient client = new StreamsKafkaClient(config);
- client.checkBrokerCompatibility();
+ client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
try {
client.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 96a56b4e9128f..59e117bf98fb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -60,6 +60,9 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
+
public class StreamsKafkaClient {
private static final ConfigDef CONFIG = StreamsConfig.configDef()
@@ -308,7 +311,7 @@ public MetadataResponse fetchMetadata() {
*
* @throws StreamsException if brokers have version 0.10.0.x
*/
- public void checkBrokerCompatibility() throws StreamsException {
+ public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
new ApiVersionsRequest.Builder(),
@@ -329,5 +332,19 @@ public void checkBrokerCompatibility() throws StreamsException {
if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
}
+
+ if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) {
+ throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher.");
+ }
}
+
+ private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) {
+ return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null;
+ }
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 88d1ccca1466c..fc421cb587fd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -23,6 +23,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -33,6 +34,7 @@
import java.io.File;
import java.util.Collections;
+import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -41,11 +43,12 @@ public class BrokerCompatibilityTest {
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
System.out.println("StreamsTest instance started");
final String kafka = args.length > 0 ? args[0] : "localhost:9092";
final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+ final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
@@ -58,6 +61,9 @@ public static void main(String[] args) throws Exception {
streamsProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ if (eosEnabled) {
+ streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
final int timeout = 6000;
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
@@ -70,7 +76,7 @@ public static void main(String[] args) throws Exception {
final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
- public void uncaughtException(Thread t, Throwable e) {
+ public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
streams.close(30, TimeUnit.SECONDS);
@@ -91,27 +97,30 @@ public void uncaughtException(Thread t, Throwable e) {
System.out.println("wait for result");
- loopUntilRecordReceived(kafka);
+ loopUntilRecordReceived(kafka, eosEnabled);
System.out.println("close Kafka Streams");
streams.close();
}
- private static void loopUntilRecordReceived(final String kafka) {
+ private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ if (eosEnabled) {
+ consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
final KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(SINK_TOPIC));
while (true) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records) {
+ final ConsumerRecords records = consumer.poll(100);
+ for (final ConsumerRecord record : records) {
if (record.key().equals("key") && record.value().equals("value")) {
consumer.close();
return;
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e7be9475f79f8..905320a59e8be 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -166,8 +166,8 @@ def __init__(self, test_context, kafka):
class StreamsBrokerCompatibilityService(StreamsTestBaseService):
- def __init__(self, test_context, kafka):
+ def __init__(self, test_context, kafka, eosEnabled):
super(StreamsBrokerCompatibilityService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
- "dummy")
+ eosEnabled)
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index a5bdbc60e1fc8..c4b554e578ac0 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -21,14 +21,16 @@
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
class StreamsBrokerCompatibility(Test):
"""
- These tests validate that Streams v0.10.2+ can connect to older brokers v0.10.1+
- and that Streams fails fast for 0.10.0 brokers
- and that Streams times-out for pre-0.10.0 brokers
+ These tests validates that
+ - Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1
+ - Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1
+ - Streams fails fast for 0.10.0 brokers
+ - Streams times-out for pre-0.10.0 brokers
"""
input = "brokerCompatibilitySourceTopic"
@@ -36,7 +38,6 @@ class StreamsBrokerCompatibility(Test):
def __init__(self, test_context):
super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)
-
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
@@ -45,9 +46,6 @@ def __init__(self, test_context):
self.input: {'partitions': 1, 'replication-factor': 1},
self.output: {'partitions': 1, 'replication-factor': 1}
})
-
- self.processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka)
-
self.consumer = VerifiableConsumer(test_context,
1,
self.kafka,
@@ -57,16 +55,35 @@ def __init__(self, test_context):
def setUp(self):
self.zk.start()
- @parametrize(broker_version=str(DEV_BRANCH))
+ @parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
- def test_compatible_brokers(self, broker_version):
+ def test_fail_fast_on_incompatible_brokers_if_eos_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
+ processor.start()
+
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
+ monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.',
+ timeout_sec=60,
+ err_msg="Never saw 'EOS requires broker version 0.11+' error message " + str(processor.node.account))
+
+ self.kafka.stop()
+
+ @parametrize(broker_version=str(LATEST_0_10_2))
+ @parametrize(broker_version=str(LATEST_0_10_1))
+ def test_compatible_brokers_eos_disabled(self, broker_version):
+ self.kafka.set_version(KafkaVersion(broker_version))
+ self.kafka.start()
+
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
+
self.consumer.start()
- self.processor.wait()
+ processor.wait()
wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
@@ -78,13 +95,14 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
- self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
- with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Kafka Streams requires broker version 0.10.1.x or higher.',
timeout_sec=60,
- err_msg="Never saw 'incompatible broker' error message " + str(self.processor.node.account))
+ err_msg="Never saw 'Streams requires broker verion 0.10.1+' error message " + str(processor.node.account))
self.kafka.stop()
@@ -94,12 +112,13 @@ def test_timeout_on_pre_010_brokers(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
- self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
- with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
timeout_sec=60,
- err_msg="Never saw 'no available broker' error message " + str(self.processor.node.account))
+ err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))
self.kafka.stop()
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7cd489d87ac64..8e1497c7889df 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -82,4 +82,9 @@ def get_version(node=None):
V_0_10_1_1 = KafkaVersion("0.10.1.1")
LATEST_0_10_1 = V_0_10_1_1
-LATEST_0_10 = LATEST_0_10_1
+# 0.10.2.x versions
+V_0_10_2_0 = KafkaVersion("0.10.2.0")
+V_0_10_2_1 = KafkaVersion("0.10.2.1")
+LATEST_0_10_2 = V_0_10_2_1
+
+LATEST_0_10 = LATEST_0_10_2
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 12aa9e0deb25f..ec3cae9f581ac 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -89,6 +89,8 @@ get_kafka 0.10.0.1
chmod a+rw /opt/kafka-0.10.0.1
get_kafka 0.10.1.1
chmod a+rw /opt/kafka-0.10.1.1
+get_kafka 0.10.2.1
+chmod a+rw /opt/kafka-0.10.2.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local