Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions tests/kafkatest/tests/client/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.version import DEV_BRANCH, V_1_1_0

class QuotaConfig(object):
CLIENT_ID = 'client-id'
Expand Down Expand Up @@ -119,7 +120,6 @@ def __init__(self, test_context):

def setUp(self):
self.zk.start()
self.kafka.start()

def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
Expand All @@ -128,23 +128,38 @@ def min_cluster_size(self):
@cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1):
@parametrize(quota_type=QuotaConfig.CLIENT_ID, old_broker_throttling_behavior=True)
@parametrize(quota_type=QuotaConfig.CLIENT_ID, old_client_throttling_behavior=True)
def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_num=1,
old_broker_throttling_behavior=False, old_client_throttling_behavior=False):
# Old (pre-2.0) throttling behavior for broker throttles before sending a response to the client.
if old_broker_throttling_behavior:
self.kafka.set_version(V_1_1_0)
self.kafka.start()

self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
producer_client_id = self.quota_config.client_id
consumer_client_id = self.quota_config.client_id

# Old (pre-2.0) throttling behavior for client does not throttle upon receiving a response with a non-zero throttle time.
if old_client_throttling_behavior:
client_version = V_1_1_0
else:
client_version = DEV_BRANCH

# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka,
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_client_id)
topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1,
client_id=producer_client_id, version=client_version)

producer.run()

# Consume all messages
consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
consumer_timeout_ms=60000, client_id=consumer_client_id,
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
jmx_attributes=['bytes-consumed-rate'])
jmx_attributes=['bytes-consumed-rate'], version=client_version)
consumer.run()

for idx, messages in consumer.messages_consumed.iteritems():
Expand Down