diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 47a6a96b02046..c084e08ab65fd 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -21,6 +21,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.performance import ProducerPerformanceService from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.version import DEV_BRANCH, V_1_1_0 class QuotaConfig(object): CLIENT_ID = 'client-id' @@ -119,7 +120,6 @@ def __init__(self, test_context): def setUp(self): self.zk.start() - self.kafka.start() def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" @@ -128,15 +128,30 @@ def min_cluster_size(self): @cluster(num_nodes=5) @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False]) @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2) - def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1): + @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True) + @parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True) + def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1, + old_broker_throttling_behavior=False, old_client_throttling_behavior=False): + # Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client. + if old_broker_throttling_behavior: + self.kafka.set_version(V_1_1_0) + self.kafka.start() + self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka) producer_client_id = self.quota_config.client_id consumer_client_id = self.quota_config.client_id + # Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time. + if old_client_throttling_behavior: + client_version = V_1_1_0 + else: + client_version = DEV_BRANCH + # Produce all messages producer = ProducerPerformanceService( self.test_context, producer_num, self.kafka, - topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id) + topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, + client_id=producer_client_id, version=client_version) producer.run() @@ -144,7 +159,7 @@ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_n consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic, consumer_timeout_ms=60000, client_id=consumer_client_id, jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id], - jmx_attributes=['bytes-consumed-rate']) + jmx_attributes=['bytes-consumed-rate'], version=client_version) consumer.run() for idx, messages in consumer.messages_consumed.iteritems():