Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
<property name="max" value="4"/>
<property name="max" value="5"/>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just add a suppression for the file that needs to have 5 booleans? There is little point in having rules if people are just going to keep on changing the values

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also little point in adding an exception for each single file that break the rule IMHO. I just think that 4 (and even 5) is a rather small number anyway. (But this goes back to you other comment -- if we only check a single API key, we could keep 4 here.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. well the point is to not break the rule! If you really feel you need to break the rule then add a suppression

</module>

<module name="ClassFanOutComplexity">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@

import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;

/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
Expand Down Expand Up @@ -409,7 +411,7 @@ private static HostInfo parseHostInfo(final String endPoint) {
private void checkBrokerVersionCompatibility() throws StreamsException {
final StreamsKafkaClient client = new StreamsKafkaClient(config);

client.checkBrokerCompatibility();
client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));

try {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;

public class StreamsKafkaClient {

private static final ConfigDef CONFIG = StreamsConfig.configDef()
Expand Down Expand Up @@ -308,7 +311,7 @@ public MetadataResponse fetchMetadata() {
*
* @throws StreamsException if brokers have version 0.10.0.x
*/
public void checkBrokerCompatibility() throws StreamsException {
public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
new ApiVersionsRequest.Builder(),
Expand All @@ -329,5 +332,19 @@ public void checkBrokerCompatibility() throws StreamsException {
if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
}

if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) {
throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher.");
}
}

private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) {
return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to check for all of these ApiKeys? i.e., we know that they are all going in as part of the same release, can we not just check for at least that version instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think there is a strict need -- I thought it might be "cleaner" to check all. But maybe that overkill. Curious what others think about this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't harm.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that it is safer to check all: though it may unlike to happen but we may remove some of the APIs moving forward.

&& apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null
&& apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null
&& apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null
&& apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null
&& apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -33,6 +34,7 @@

import java.io.File;
import java.util.Collections;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

Expand All @@ -41,11 +43,12 @@ public class BrokerCompatibilityTest {
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
System.out.println("StreamsTest instance started");

final String kafka = args.length > 0 ? args[0] : "localhost:9092";
final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;

final File stateDir = new File(stateDirStr);
stateDir.mkdir();
Expand All @@ -58,6 +61,9 @@ public static void main(String[] args) throws Exception {
streamsProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
if (eosEnabled) {
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
final int timeout = 6000;
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
Expand All @@ -70,7 +76,7 @@ public static void main(String[] args) throws Exception {
final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);

streams.close(30, TimeUnit.SECONDS);
Expand All @@ -91,27 +97,30 @@ public void uncaughtException(Thread t, Throwable e) {


System.out.println("wait for result");
loopUntilRecordReceived(kafka);
loopUntilRecordReceived(kafka, eosEnabled);


System.out.println("close Kafka Streams");
streams.close();
}

private static void loopUntilRecordReceived(final String kafka) {
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (eosEnabled) {
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}

final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(SINK_TOPIC));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
final ConsumerRecords<String, String> records = consumer.poll(100);
for (final ConsumerRecord<String, String> record : records) {
if (record.key().equals("key") && record.value().equals("value")) {
consumer.close();
return;
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ def __init__(self, test_context, kafka):


class StreamsBrokerCompatibilityService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
def __init__(self, test_context, kafka, eosEnabled):
super(StreamsBrokerCompatibilityService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
"dummy")
eosEnabled)
59 changes: 39 additions & 20 deletions tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,23 @@
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import DEV_BRANCH, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion


class StreamsBrokerCompatibility(Test):
"""
These tests validate that Streams v0.10.2+ can connect to older brokers v0.10.1+
and that Streams fails fast for 0.10.0 brokers
and that Streams times-out for pre-0.10.0 brokers
These tests validates that
- Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1
- Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1
- Streams fails fast for 0.10.0 brokers
- Streams times-out for pre-0.10.0 brokers
"""

input = "brokerCompatibilitySourceTopic"
output = "brokerCompatibilitySinkTopic"

def __init__(self, test_context):
super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)

self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
Expand All @@ -45,9 +46,6 @@ def __init__(self, test_context):
self.input: {'partitions': 1, 'replication-factor': 1},
self.output: {'partitions': 1, 'replication-factor': 1}
})

self.processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka)

self.consumer = VerifiableConsumer(test_context,
1,
self.kafka,
Expand All @@ -57,16 +55,35 @@ def __init__(self, test_context):
def setUp(self):
self.zk.start()

@parametrize(broker_version=str(DEV_BRANCH))
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
def test_compatible_brokers(self, broker_version):
def test_fail_fast_on_incompatible_brokers_if_eos_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

self.processor.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
processor.start()

processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.',
timeout_sec=60,
err_msg="Never saw 'EOS requires broker version 0.11+' error message " + str(processor.node.account))

self.kafka.stop()

@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
def test_compatible_brokers_eos_disabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
processor.start()

self.consumer.start()

self.processor.wait()
processor.wait()

wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")

Expand All @@ -78,13 +95,14 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

self.processor.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
processor.start()

self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Kafka Streams requires broker version 0.10.1.x or higher.',
timeout_sec=60,
err_msg="Never saw 'incompatible broker' error message " + str(self.processor.node.account))
err_msg="Never saw 'Streams requires broker verion 0.10.1+' error message " + str(processor.node.account))

self.kafka.stop()

Expand All @@ -94,12 +112,13 @@ def test_timeout_on_pre_010_brokers(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()

self.processor.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
processor.start()

self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
timeout_sec=60,
err_msg="Never saw 'no available broker' error message " + str(self.processor.node.account))
err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))

self.kafka.stop()
7 changes: 6 additions & 1 deletion tests/kafkatest/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ def get_version(node=None):
V_0_10_1_1 = KafkaVersion("0.10.1.1")
LATEST_0_10_1 = V_0_10_1_1

LATEST_0_10 = LATEST_0_10_1
# 0.10.2.x versions
V_0_10_2_0 = KafkaVersion("0.10.2.0")
V_0_10_2_1 = KafkaVersion("0.10.2.1")
LATEST_0_10_2 = V_0_10_2_1

LATEST_0_10 = LATEST_0_10_2
2 changes: 2 additions & 0 deletions vagrant/base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ get_kafka 0.10.0.1
chmod a+rw /opt/kafka-0.10.0.1
get_kafka 0.10.1.1
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.1
chmod a+rw /opt/kafka-0.10.2.1


# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
Expand Down