From b13811fbf70ce989f2380d696edaa7d0e074a443 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 10:31:13 -0700 Subject: [PATCH 1/2] Use pytest.fixture for MockBroker; parametrize broker_version --- test/admin/test_admin_configs.py | 160 ++++++------- test/admin/test_admin_groups.py | 352 +++++++++++----------------- test/admin/test_admin_partitions.py | 340 +++++++++------------------ test/admin/test_admin_users.py | 125 ++++------ test/mock_broker.py | 32 +-- 5 files changed, 388 insertions(+), 621 deletions(-) diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index a52270dae..aa4e3222b 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -10,7 +10,7 @@ ListConfigResourcesRequest, ListConfigResourcesResponse, ) -from test.mock_broker import MockBroker +from test.broker import MockBroker # ConfigResourceType values (wire) @@ -40,15 +40,17 @@ def test_config_resource(): @pytest.fixture -def mock_broker(): - return MockBroker() +def broker(request): + # parametrize tests with indirect=True + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + @pytest.fixture -def admin(mock_broker): +def admin(broker): admin = KafkaAdminClient( - kafka_client=mock_broker.client_factory(), - bootstrap_servers='%s:%d' % (mock_broker.host, mock_broker.port), - api_version=mock_broker.broker_version, + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), request_timeout_ms=5000, ) try: @@ -125,52 +127,52 @@ def _sent_configs(captured): class TestAlterConfigsMockBroker: - def test_fills_in_other_modified_keys(self, mock_broker, admin): + def test_fills_in_other_modified_keys(self, broker, admin): """User asks to set foo; bar is already modified; both end up on the wire.""" # validation describe (dynamic filter) - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ('baz', None, _SRC_DEFAULT, False), ])) # add_missing describe (modified filter) — same wire response, Python filters - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ('baz', None, _SRC_DEFAULT, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], incremental=False) sent = _sent_configs(captured) assert sent == {'foo': 'new', 'bar': 'barval'} - def test_user_value_wins_over_describe(self, mock_broker, admin): + def test_user_value_wins_over_describe(self, broker, admin): for _ in range(2): # validation + add_missing - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'brokerval', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})], incremental=False) assert _sent_configs(captured) == {'foo': 'userval'} - def test_none_value_from_describe_is_skipped(self, mock_broker, admin): + def test_none_value_from_describe_is_skipped(self, broker, admin): for _ in range(2): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'userval-placeholder', _SRC_DYNAMIC_TOPIC, False), ('bar', None, _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.alter_configs([ConfigResource('TOPIC', 'topic-a', {'foo': 'userval'})], incremental=False) @@ -178,8 +180,8 @@ def test_none_value_from_describe_is_skipped(self, mock_broker, admin): assert sent == {'foo': 'userval'} assert 'bar' not in sent - def test_raise_on_unknown_true_raises(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_raise_on_unknown_true_raises(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ])) @@ -189,14 +191,14 @@ def test_raise_on_unknown_true_raises(self, mock_broker, admin): [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})], incremental=False) - def test_raise_on_unknown_false_submits_anyway(self, mock_broker, admin): + def test_raise_on_unknown_false_submits_anyway(self, broker, admin): # only add_missing describe (validation is skipped) - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'mystery': 'x'})], @@ -207,14 +209,14 @@ def test_raise_on_unknown_false_submits_anyway(self, mock_broker, admin): assert sent['mystery'] == 'x' assert sent['foo'] == 'val' - def test_validate_only_propagates(self, mock_broker, admin): + def test_validate_only_propagates(self, broker, admin): for _ in range(2): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], @@ -230,10 +232,10 @@ def test_validate_only_propagates(self, mock_broker, admin): class TestResetConfigsMockBroker: - def test_full_reset_sends_empty_configs(self, mock_broker, admin): + def test_full_reset_sends_empty_configs(self, broker, admin): """Resource with no configs => submit empty configs list (full reset).""" captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) # configs=[] (empty iterable) opts out of validation + get_missing describe admin.reset_configs( @@ -243,24 +245,24 @@ def test_full_reset_sends_empty_configs(self, mock_broker, admin): assert captured['request'].resources[0].configs == [] - def test_partial_reset_excludes_user_keys_keeps_others(self, mock_broker, admin): + def test_partial_reset_excludes_user_keys_keeps_others(self, broker, admin): """reset_configs({foo}) => submit all OTHER modified keys so only foo resets.""" # validation describe (dynamic filter) - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ('baz', 'bazval', _SRC_DYNAMIC_TOPIC, False), ])) # get_missing describe (modified filter) - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ('baz', 'bazval', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo'])], incremental=False) @@ -268,30 +270,30 @@ def test_partial_reset_excludes_user_keys_keeps_others(self, mock_broker, admin) assert 'foo' not in sent assert sent == {'bar': 'barval', 'baz': 'bazval'} - def test_reset_all_modified_keys_sends_empty(self, mock_broker, admin): + def test_reset_all_modified_keys_sends_empty(self, broker, admin): """If user resets every currently-modified key, the wire body is empty.""" for _ in range(2): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.reset_configs([ConfigResource('TOPIC', 'topic-a', ['foo', 'bar'])], incremental=False) assert _sent_configs(captured) == {} - def test_validate_only_propagates(self, mock_broker, admin): + def test_validate_only_propagates(self, broker, admin): for _ in range(2): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'fooval', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) + broker.respond_fn(AlterConfigsRequest, _capture_alter(captured)) admin.reset_configs( [ConfigResource('TOPIC', 'topic-a', ['foo'])], @@ -330,8 +332,8 @@ def handler(api_key, api_version, correlation_id, request_bytes): class TestListConfigResourcesMockBroker: - def test_groups_results_by_resource_type(self, mock_broker, admin): - mock_broker.respond( + def test_groups_results_by_resource_type(self, broker, admin): + broker.respond( ListConfigResourcesRequest, _list_config_resources_response([ ('topic-a', ConfigResourceType.TOPIC.value), @@ -347,9 +349,9 @@ def test_groups_results_by_resource_type(self, mock_broker, admin): 'client_metrics': ['metrics-1'], } - def test_no_resource_types_sends_empty_filter(self, mock_broker, admin): + def test_no_resource_types_sends_empty_filter(self, broker, admin): captured = {} - mock_broker.respond_fn( + broker.respond_fn( ListConfigResourcesRequest, _capture_list_config_resources( captured, _list_config_resources_response([]))) @@ -357,9 +359,9 @@ def test_no_resource_types_sends_empty_filter(self, mock_broker, admin): admin.list_config_resources() assert captured['request'].resource_types == [] - def test_string_filter_is_normalized_and_sent_as_int8(self, mock_broker, admin): + def test_string_filter_is_normalized_and_sent_as_int8(self, broker, admin): captured = {} - mock_broker.respond_fn( + broker.respond_fn( ListConfigResourcesRequest, _capture_list_config_resources( captured, _list_config_resources_response([]))) @@ -371,9 +373,9 @@ def test_string_filter_is_normalized_and_sent_as_int8(self, mock_broker, admin): ConfigResourceType.CLIENT_METRICS.value, } - def test_enum_filter_accepted(self, mock_broker, admin): + def test_enum_filter_accepted(self, broker, admin): captured = {} - mock_broker.respond_fn( + broker.respond_fn( ListConfigResourcesRequest, _capture_list_config_resources( captured, _list_config_resources_response([]))) @@ -387,16 +389,16 @@ def test_unrecognized_type_raises(self, admin): with pytest.raises(ValueError, match='Unrecognized ConfigResourceType'): admin.list_config_resources(resource_types=['bogus']) - def test_error_code_raises(self, mock_broker, admin): - mock_broker.respond( + def test_error_code_raises(self, broker, admin): + broker.respond( ListConfigResourcesRequest, _list_config_resources_response( [], error_code=ClusterAuthorizationFailedError.errno)) with pytest.raises(ClusterAuthorizationFailedError): admin.list_config_resources() - def test_empty_response_returns_empty_dict(self, mock_broker, admin): - mock_broker.respond( + def test_empty_response_returns_empty_dict(self, broker, admin): + broker.respond( ListConfigResourcesRequest, _list_config_resources_response([])) assert admin.list_config_resources() == {} @@ -443,14 +445,14 @@ def _sent_incremental(captured): class TestIncrementalAlterConfigsMockBroker: - def test_set_op_sends_triples(self, mock_broker, admin): + def test_set_op_sends_triples(self, broker, admin): # validation describe (dynamic filter) - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) result = admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', @@ -461,13 +463,13 @@ def test_set_op_sends_triples(self, mock_broker, admin): assert sent == {'foo': (AlterConfigOp.SET.value, 'new')} assert result == {'topic': {'topic-a': 'OK'}} - def test_bare_value_is_treated_as_set(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_bare_value_is_treated_as_set(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'foo': 'new'})], @@ -475,13 +477,13 @@ def test_bare_value_is_treated_as_set(self, mock_broker, admin): assert _sent_incremental(captured) == {'foo': (AlterConfigOp.SET.value, 'new')} - def test_delete_op_forces_null_value(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_delete_op_forces_null_value(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'old', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', @@ -490,13 +492,13 @@ def test_delete_op_forces_null_value(self, mock_broker, admin): assert _sent_incremental(captured) == {'foo': (AlterConfigOp.DELETE.value, None)} - def test_append_and_subtract_ops(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_append_and_subtract_ops(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('follower.replication.throttled.replicas', '*', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', { @@ -511,11 +513,11 @@ def test_append_and_subtract_ops(self, mock_broker, admin): } captured = {} - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('follower.replication.throttled.replicas', '*', _SRC_DYNAMIC_TOPIC, False), ])) - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', { @@ -529,13 +531,13 @@ def test_append_and_subtract_ops(self, mock_broker, admin): (AlterConfigOp.SUBTRACT.value, '1:0'), } - def test_string_op_is_normalized(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_string_op_is_normalized(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', {'foo': ('set', 'v')})], @@ -550,8 +552,8 @@ def test_unrecognized_op_raises(self, admin): raise_on_unknown=False, incremental=True) - def test_raise_on_unknown_true_raises(self, mock_broker, admin): - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + def test_raise_on_unknown_true_raises(self, broker, admin): + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ])) @@ -562,9 +564,9 @@ def test_raise_on_unknown_true_raises(self, mock_broker, admin): {'mystery': (AlterConfigOp.SET, 'x')})], incremental=True) - def test_raise_on_unknown_false_skips_describe(self, mock_broker, admin): + def test_raise_on_unknown_false_skips_describe(self, broker, admin): captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', @@ -576,9 +578,9 @@ def test_raise_on_unknown_false_skips_describe(self, mock_broker, admin): 'mystery': (AlterConfigOp.SET.value, 'x'), } - def test_validate_only_propagates(self, mock_broker, admin): + def test_validate_only_propagates(self, broker, admin): captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', @@ -589,15 +591,15 @@ def test_validate_only_propagates(self, mock_broker, admin): assert captured['request'].validate_only is True - def test_does_not_fill_in_other_keys(self, mock_broker, admin): + def test_does_not_fill_in_other_keys(self, broker, admin): """Unlike alter_configs, incremental should NOT send untouched keys.""" - mock_broker.respond(DescribeConfigsRequest, _describe_configs_response( + broker.respond(DescribeConfigsRequest, _describe_configs_response( _TOPIC, 'topic-a', [ ('foo', 'val', _SRC_DYNAMIC_TOPIC, False), ('bar', 'barval', _SRC_DYNAMIC_TOPIC, False), ])) captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter(captured)) admin.alter_configs( [ConfigResource('TOPIC', 'topic-a', @@ -608,9 +610,9 @@ def test_does_not_fill_in_other_keys(self, mock_broker, admin): 'foo': (AlterConfigOp.SET.value, 'new'), } - def test_broker_resource_routed_by_broker_id(self, mock_broker, admin): + def test_broker_resource_routed_by_broker_id(self, broker, admin): captured = {} - mock_broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter( + broker.respond_fn(IncrementalAlterConfigsRequest, _capture_incremental_alter( captured, _incremental_alter_configs_response([ (ConfigResourceType.BROKER.value, '0', 0, None), @@ -636,8 +638,8 @@ def test_non_integer_broker_name_raises(self, admin): raise_on_unknown=False, incremental=True) - def test_error_response_surfaces_in_result(self, mock_broker, admin): - mock_broker.respond( + def test_error_response_surfaces_in_result(self, broker, admin): + broker.respond( IncrementalAlterConfigsRequest, _incremental_alter_configs_response([ (_TOPIC, 'topic-a', InvalidConfigurationError.errno, 'bad value'), diff --git a/test/admin/test_admin_groups.py b/test/admin/test_admin_groups.py index d00c7fdb1..43eca03cc 100644 --- a/test/admin/test_admin_groups.py +++ b/test/admin/test_admin_groups.py @@ -19,13 +19,24 @@ from test.mock_broker import MockBroker -def _make_admin(broker): - return KafkaAdminClient( +@pytest.fixture +def broker(request): + # parametrize tests with indirect=True + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( kafka_client=broker.client_factory(), bootstrap_servers='%s:%d' % (broker.host, broker.port), - api_version=broker.broker_version, request_timeout_ms=5000, ) + try: + yield admin + finally: + admin.close() # --------------------------------------------------------------------------- @@ -34,9 +45,7 @@ def _make_admin(broker): class TestAlterGroupOffsetsMockBroker: - - def test_success_returns_tp_to_noerror(self): - broker = MockBroker() + def test_success_returns_tp_to_noerror(self, broker, admin): _Topic = OffsetCommitResponse.OffsetCommitResponseTopic _Partition = _Topic.OffsetCommitResponsePartition broker.respond( @@ -52,26 +61,21 @@ def test_success_returns_tp_to_noerror(self): ), ) - admin = _make_admin(broker) - try: - result = admin.alter_group_offsets( - 'g1', - { - TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None), - TopicPartition('topic-a', 1): OffsetAndMetadata(20, 'm', 5), - }, - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.alter_group_offsets( + 'g1', + { + TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None), + TopicPartition('topic-a', 1): OffsetAndMetadata(20, 'm', 5), + }, + group_coordinator_id=0, + ) assert result == { TopicPartition('topic-a', 0): NoError, TopicPartition('topic-a', 1): NoError, } - def test_request_uses_standalone_member_fields(self): - broker = MockBroker() + def test_request_uses_standalone_member_fields(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -81,15 +85,11 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(OffsetCommitRequest, handler) - admin = _make_admin(broker) - try: - admin.alter_group_offsets( - 'g1', - {TopicPartition('topic-a', 0): OffsetAndMetadata(10, 'meta', 7)}, - group_coordinator_id=0, - ) - finally: - admin.close() + admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(10, 'meta', 7)}, + group_coordinator_id=0, + ) req = captured['request'] assert req.group_id == 'g1' @@ -107,8 +107,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert p.committed_metadata == 'meta' assert p.committed_leader_epoch == 7 - def test_leader_epoch_none_sent_as_minus_one(self): - broker = MockBroker() + def test_leader_epoch_none_sent_as_minus_one(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -118,20 +117,15 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(OffsetCommitRequest, handler) - admin = _make_admin(broker) - try: - admin.alter_group_offsets( - 'g1', - {TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None)}, - group_coordinator_id=0, - ) - finally: - admin.close() + admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(10, '', None)}, + group_coordinator_id=0, + ) assert captured['request'].topics[0].partitions[0].committed_leader_epoch == -1 - def test_partition_level_error_returned_not_raised(self): - broker = MockBroker() + def test_partition_level_error_returned_not_raised(self, broker, admin): _Topic = OffsetCommitResponse.OffsetCommitResponseTopic _Partition = _Topic.OffsetCommitResponsePartition broker.respond( @@ -147,25 +141,16 @@ def test_partition_level_error_returned_not_raised(self): ), ) - admin = _make_admin(broker) - try: - result = admin.alter_group_offsets( - 'g1', - {TopicPartition('topic-a', 0): OffsetAndMetadata(1, '', None)}, - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.alter_group_offsets( + 'g1', + {TopicPartition('topic-a', 0): OffsetAndMetadata(1, '', None)}, + group_coordinator_id=0, + ) assert result == {TopicPartition('topic-a', 0): UnknownMemberIdError} - def test_empty_offsets_is_noop(self): - broker = MockBroker() - admin = _make_admin(broker) - try: - result = admin.alter_group_offsets('g1', {}, group_coordinator_id=0) - finally: - admin.close() + def test_empty_offsets_is_noop(self, broker, admin): + result = admin.alter_group_offsets('g1', {}, group_coordinator_id=0) assert result == {} @@ -175,9 +160,7 @@ def test_empty_offsets_is_noop(self): class TestDeleteGroupOffsetsMockBroker: - - def test_success_returns_tp_to_noerror(self): - broker = MockBroker() + def test_success_returns_tp_to_noerror(self, broker, admin): _Topic = OffsetDeleteResponse.OffsetDeleteResponseTopic _Partition = _Topic.OffsetDeleteResponsePartition broker.respond( @@ -194,23 +177,18 @@ def test_success_returns_tp_to_noerror(self): ), ) - admin = _make_admin(broker) - try: - result = admin.delete_group_offsets( - 'g1', - [TopicPartition('topic-a', 0), TopicPartition('topic-a', 1)], - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0), TopicPartition('topic-a', 1)], + group_coordinator_id=0, + ) assert result == { TopicPartition('topic-a', 0): NoError, TopicPartition('topic-a', 1): NoError, } - def test_groups_partitions_by_topic(self): - broker = MockBroker() + def test_groups_partitions_by_topic(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -220,19 +198,15 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(OffsetDeleteRequest, handler) - admin = _make_admin(broker) - try: - admin.delete_group_offsets( - 'g1', - [ - TopicPartition('topic-a', 0), - TopicPartition('topic-b', 2), - TopicPartition('topic-a', 1), - ], - group_coordinator_id=0, - ) - finally: - admin.close() + admin.delete_group_offsets( + 'g1', + [ + TopicPartition('topic-a', 0), + TopicPartition('topic-b', 2), + TopicPartition('topic-a', 1), + ], + group_coordinator_id=0, + ) req = captured['request'] assert req.group_id == 'g1' @@ -243,8 +217,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert a_indexes == [0, 1] assert b_indexes == [2] - def test_top_level_error_raises(self): - broker = MockBroker() + def test_top_level_error_raises(self, broker, admin): broker.respond( OffsetDeleteRequest, OffsetDeleteResponse( @@ -254,19 +227,14 @@ def test_top_level_error_raises(self): ), ) - admin = _make_admin(broker) - try: - with pytest.raises(GroupIdNotFoundError): - admin.delete_group_offsets( - 'g1', - [TopicPartition('topic-a', 0)], - group_coordinator_id=0, - ) - finally: - admin.close() - - def test_partition_level_error_returned_not_raised(self): - broker = MockBroker() + with pytest.raises(GroupIdNotFoundError): + admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0)], + group_coordinator_id=0, + ) + + def test_partition_level_error_returned_not_raised(self, broker, admin): _Topic = OffsetDeleteResponse.OffsetDeleteResponseTopic _Partition = _Topic.OffsetDeleteResponsePartition broker.respond( @@ -283,25 +251,16 @@ def test_partition_level_error_returned_not_raised(self): ), ) - admin = _make_admin(broker) - try: - result = admin.delete_group_offsets( - 'g1', - [TopicPartition('topic-a', 0)], - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.delete_group_offsets( + 'g1', + [TopicPartition('topic-a', 0)], + group_coordinator_id=0, + ) assert result == {TopicPartition('topic-a', 0): GroupSubscribedToTopicError} - def test_empty_partitions_is_noop(self): - broker = MockBroker() - admin = _make_admin(broker) - try: - result = admin.delete_group_offsets('g1', [], group_coordinator_id=0) - finally: - admin.close() + def test_empty_partitions_is_noop(self, broker, admin): + result = admin.delete_group_offsets('g1', [], group_coordinator_id=0) assert result == {} @@ -311,9 +270,8 @@ def test_empty_partitions_is_noop(self): class TestRemoveGroupMembersMockBroker: - - def test_batch_success_returns_member_to_noerror(self): - broker = MockBroker() # broker_version=(4, 2) -> LeaveGroup v5 + def test_batch_success_returns_member_to_noerror(self, broker, admin): + # broker_version=(4, 2) -> LeaveGroup v5 _MemberResp = LeaveGroupResponse.MemberResponse broker.respond( LeaveGroupRequest, @@ -327,26 +285,21 @@ def test_batch_success_returns_member_to_noerror(self): ), ) - admin = _make_admin(broker) - try: - result = admin.remove_group_members( - 'g1', - [ - MemberToRemove(member_id='m1'), - MemberToRemove(group_instance_id='static-1'), - ], - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1'), + MemberToRemove(group_instance_id='static-1'), + ], + group_coordinator_id=0, + ) assert result == { 'm1': NoError, 'static-1': NoError, } - def test_batch_request_fields(self): - broker = MockBroker() + def test_batch_request_fields(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -358,18 +311,14 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(LeaveGroupRequest, handler) - admin = _make_admin(broker) - try: - admin.remove_group_members( - 'g1', - [ - MemberToRemove(member_id='m1', reason='rebalance'), - MemberToRemove(group_instance_id='inst-2', reason='shutdown'), - ], - group_coordinator_id=0, - ) - finally: - admin.close() + admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1', reason='rebalance'), + MemberToRemove(group_instance_id='inst-2', reason='shutdown'), + ], + group_coordinator_id=0, + ) assert captured['version'] >= 3 req = captured['request'] @@ -385,8 +334,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert m1.reason == 'rebalance' assert m2.reason == 'shutdown' - def test_batch_top_level_error_raises(self): - broker = MockBroker() + def test_batch_top_level_error_raises(self, broker, admin): broker.respond( LeaveGroupRequest, LeaveGroupResponse( @@ -396,19 +344,14 @@ def test_batch_top_level_error_raises(self): ), ) - admin = _make_admin(broker) - try: - with pytest.raises(GroupIdNotFoundError): - admin.remove_group_members( - 'g1', - [MemberToRemove(member_id='m1')], - group_coordinator_id=0, - ) - finally: - admin.close() - - def test_batch_per_member_error_returned(self): - broker = MockBroker() + with pytest.raises(GroupIdNotFoundError): + admin.remove_group_members( + 'g1', + [MemberToRemove(member_id='m1')], + group_coordinator_id=0, + ) + + def test_batch_per_member_error_returned(self, broker, admin): _MemberResp = LeaveGroupResponse.MemberResponse broker.respond( LeaveGroupRequest, @@ -422,30 +365,21 @@ def test_batch_per_member_error_returned(self): ), ) - admin = _make_admin(broker) - try: - result = admin.remove_group_members( - 'g1', - [MemberToRemove(member_id='m1')], - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.remove_group_members( + 'g1', + [MemberToRemove(member_id='m1')], + group_coordinator_id=0, + ) assert result == {'m1': UnknownMemberIdError} - def test_empty_members_is_noop(self): - broker = MockBroker() - admin = _make_admin(broker) - try: - result = admin.remove_group_members('g1', [], group_coordinator_id=0) - finally: - admin.close() + def test_empty_members_is_noop(self, broker, admin): + result = admin.remove_group_members('g1', [], group_coordinator_id=0) assert result == {} - def test_fallback_fans_out_one_request_per_member(self): + @pytest.mark.parametrize("broker", [(2, 3)], indirect=True) + def test_fallback_fans_out_one_request_per_member(self, broker, admin): # (2, 3) broker: LeaveGroup v0-v2 only, no batch support - broker = MockBroker(broker_version=(2, 3)) captured = [] def handler(api_key, api_version, correlation_id, request_bytes): @@ -457,18 +391,14 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(LeaveGroupRequest, handler) broker.respond_fn(LeaveGroupRequest, handler) - admin = _make_admin(broker) - try: - result = admin.remove_group_members( - 'g1', - [ - MemberToRemove(member_id='m1'), - MemberToRemove(member_id='m2'), - ], - group_coordinator_id=0, - ) - finally: - admin.close() + result = admin.remove_group_members( + 'g1', + [ + MemberToRemove(member_id='m1'), + MemberToRemove(member_id='m2'), + ], + group_coordinator_id=0, + ) assert len(captured) == 2 assert captured[0].group_id == 'g1' @@ -479,28 +409,20 @@ def handler(api_key, api_version, correlation_id, request_bytes): 'm2': NoError, } - def test_fallback_rejects_group_instance_id(self): - broker = MockBroker(broker_version=(2, 3)) - admin = _make_admin(broker) - try: - with pytest.raises(UnsupportedVersionError): - admin.remove_group_members( - 'g1', - [MemberToRemove(group_instance_id='inst-1')], - group_coordinator_id=0, - ) - finally: - admin.close() - - def test_fallback_requires_member_id(self): - broker = MockBroker(broker_version=(2, 3)) - admin = _make_admin(broker) - try: - with pytest.raises(ValueError): - admin.remove_group_members( - 'g1', - [MemberToRemove()], - group_coordinator_id=0, - ) - finally: - admin.close() + @pytest.mark.parametrize("broker", [(2, 3)], indirect=True) + def test_fallback_rejects_group_instance_id(self, broker, admin): + with pytest.raises(UnsupportedVersionError): + admin.remove_group_members( + 'g1', + [MemberToRemove(group_instance_id='inst-1')], + group_coordinator_id=0, + ) + + @pytest.mark.parametrize("broker", [(2, 3)], indirect=True) + def test_fallback_requires_member_id(self, broker, admin): + with pytest.raises(ValueError): + admin.remove_group_members( + 'g1', + [MemberToRemove()], + group_coordinator_id=0, + ) diff --git a/test/admin/test_admin_partitions.py b/test/admin/test_admin_partitions.py index e593d8a92..95e70df6a 100644 --- a/test/admin/test_admin_partitions.py +++ b/test/admin/test_admin_partitions.py @@ -19,13 +19,24 @@ from test.mock_broker import MockBroker -def _make_admin(broker): - return KafkaAdminClient( +@pytest.fixture +def broker(request): + # parametrize tests with indirect=True + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( kafka_client=broker.client_factory(), bootstrap_servers='%s:%d' % (broker.host, broker.port), - api_version=broker.broker_version, request_timeout_ms=5000, ) + try: + yield admin + finally: + admin.close() # --------------------------------------------------------------------------- @@ -34,9 +45,7 @@ def _make_admin(broker): class TestAlterPartitionReassignmentsMockBroker: - - def test_success_returns_dict(self): - broker = MockBroker() + def test_success_returns_dict(self, broker, admin): Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse Partition = Topic.ReassignablePartitionResponse broker.respond( @@ -56,20 +65,15 @@ def test_success_returns_dict(self): ), ) - admin = _make_admin(broker) - try: - result = admin.alter_partition_reassignments({ - TopicPartition('topic-a', 0): [1, 2, 3], - }) - finally: - admin.close() + result = admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): [1, 2, 3], + }) assert result['error_code'] == 0 assert result['responses'][0]['name'] == 'topic-a' assert result['responses'][0]['partitions'][0]['error_code'] == 0 - def test_cancel_reassignment_sends_null_replicas(self): - broker = MockBroker() + def test_cancel_reassignment_sends_null_replicas(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -81,14 +85,10 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(AlterPartitionReassignmentsRequest, handler) - admin = _make_admin(broker) - try: - admin.alter_partition_reassignments({ - TopicPartition('topic-a', 0): None, # cancel - TopicPartition('topic-a', 1): [4, 5], - }) - finally: - admin.close() + admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): None, # cancel + TopicPartition('topic-a', 1): [4, 5], + }) req = captured['request'] assert len(req.topics) == 1 @@ -97,8 +97,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): assert by_index[0].replicas is None assert list(by_index[1].replicas) == [4, 5] - def test_partition_level_error_raises(self): - broker = MockBroker() + def test_partition_level_error_raises(self, broker, admin): Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse Partition = Topic.ReassignablePartitionResponse broker.respond( @@ -119,17 +118,12 @@ def test_partition_level_error_raises(self): ), ) - admin = _make_admin(broker) - try: - with pytest.raises(Exception): - admin.alter_partition_reassignments({ - TopicPartition('topic-a', 0): [1, 2, 3], - }) - finally: - admin.close() + with pytest.raises(Exception): + admin.alter_partition_reassignments({ + TopicPartition('topic-a', 0): [1, 2, 3], + }) - def test_partition_error_suppressed_with_raise_errors_false(self): - broker = MockBroker() + def test_partition_error_suppressed_with_raise_errors_false(self, broker, admin): Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse Partition = Topic.ReassignablePartitionResponse broker.respond( @@ -149,14 +143,10 @@ def test_partition_error_suppressed_with_raise_errors_false(self): ), ) - admin = _make_admin(broker) - try: - result = admin.alter_partition_reassignments( - {TopicPartition('topic-a', 0): [1, 2, 3]}, - raise_errors=False, - ) - finally: - admin.close() + result = admin.alter_partition_reassignments( + {TopicPartition('topic-a', 0): [1, 2, 3]}, + raise_errors=False, + ) assert result['responses'][0]['partitions'][0]['error_code'] == 37 @@ -167,9 +157,7 @@ def test_partition_error_suppressed_with_raise_errors_false(self): class TestListPartitionReassignmentsMockBroker: - - def test_returns_tp_to_reassignment_dict(self): - broker = MockBroker() + def test_returns_tp_to_reassignment_dict(self, broker, admin): Topic = ListPartitionReassignmentsResponse.OngoingTopicReassignment Partition = Topic.OngoingPartitionReassignment broker.respond( @@ -200,11 +188,7 @@ def test_returns_tp_to_reassignment_dict(self): ), ) - admin = _make_admin(broker) - try: - result = admin.list_partition_reassignments() - finally: - admin.close() + result = admin.list_partition_reassignments() assert result == { TopicPartition('topic-a', 0): { @@ -219,8 +203,7 @@ def test_returns_tp_to_reassignment_dict(self): }, } - def test_none_topic_partitions_sends_null_topics(self): - broker = MockBroker() + def test_none_topic_partitions_sends_null_topics(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -232,16 +215,11 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListPartitionReassignmentsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_reassignments() - finally: - admin.close() + admin.list_partition_reassignments() assert captured['request'].topics is None - def test_dict_input_encodes_topic_partitions(self): - broker = MockBroker() + def test_dict_input_encodes_topic_partitions(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -253,17 +231,12 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListPartitionReassignmentsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_reassignments({'topic-a': [0, 1], 'topic-b': [2]}) - finally: - admin.close() + admin.list_partition_reassignments({'topic-a': [0, 1], 'topic-b': [2]}) topics = {t.name: list(t.partition_indexes) for t in captured['request'].topics} assert topics == {'topic-a': [0, 1], 'topic-b': [2]} - def test_tp_list_input_groups_by_topic(self): - broker = MockBroker() + def test_tp_list_input_groups_by_topic(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -275,21 +248,16 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListPartitionReassignmentsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_reassignments([ - TopicPartition('topic-a', 0), - TopicPartition('topic-a', 1), - TopicPartition('topic-b', 5), - ]) - finally: - admin.close() + admin.list_partition_reassignments([ + TopicPartition('topic-a', 0), + TopicPartition('topic-a', 1), + TopicPartition('topic-b', 5), + ]) topics = {t.name: sorted(t.partition_indexes) for t in captured['request'].topics} assert topics == {'topic-a': [0, 1], 'topic-b': [5]} - def test_top_level_error_raises(self): - broker = MockBroker() + def test_top_level_error_raises(self, broker, admin): broker.respond( ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse( @@ -300,13 +268,9 @@ def test_top_level_error_raises(self): ), ) - admin = _make_admin(broker) - try: - with pytest.raises(Exception) as exc_info: - admin.list_partition_reassignments() - assert 'not controller' in str(exc_info.value) - finally: - admin.close() + with pytest.raises(Exception) as exc_info: + admin.list_partition_reassignments() + assert 'not controller' in str(exc_info.value) # --------------------------------------------------------------------------- @@ -315,9 +279,7 @@ def test_top_level_error_raises(self): class TestDescribeTopicPartitionsMockBroker: - - def test_returns_topic_partition_details(self): - broker = MockBroker() + def test_returns_topic_partition_details(self, broker, admin): topic_id = uuid.uuid4() Topic = DescribeTopicPartitionsResponse.DescribeTopicPartitionsResponseTopic Partition = Topic.DescribeTopicPartitionsResponsePartition @@ -351,11 +313,7 @@ def test_returns_topic_partition_details(self): ), ) - admin = _make_admin(broker) - try: - result = admin.describe_topic_partitions(['topic-a']) - finally: - admin.close() + result = admin.describe_topic_partitions(['topic-a']) assert result['next_cursor'] is None assert len(result['topics']) == 1 @@ -368,8 +326,7 @@ def test_returns_topic_partition_details(self): assert t['partitions'][0]['eligible_leader_replicas'] == [3] assert t['partitions'][0]['last_known_elr'] == [2] - def test_pagination_cursor_returned(self): - broker = MockBroker() + def test_pagination_cursor_returned(self, broker, admin): Cursor = DescribeTopicPartitionsResponse.Cursor broker.respond( DescribeTopicPartitionsRequest, @@ -379,17 +336,10 @@ def test_pagination_cursor_returned(self): next_cursor=Cursor(topic_name='topic-next', partition_index=5), ), ) - - admin = _make_admin(broker) - try: - result = admin.describe_topic_partitions(['topic-a']) - finally: - admin.close() - + result = admin.describe_topic_partitions(['topic-a']) assert result['next_cursor'] == {'topic_name': 'topic-next', 'partition_index': 5} - def test_request_encodes_topics_and_limit(self): - broker = MockBroker() + def test_request_encodes_topics_and_limit(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -401,20 +351,15 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(DescribeTopicPartitionsRequest, handler) - admin = _make_admin(broker) - try: - admin.describe_topic_partitions( - ['topic-a', 'topic-b'], response_partition_limit=100) - finally: - admin.close() + admin.describe_topic_partitions( + ['topic-a', 'topic-b'], response_partition_limit=100) req = captured['request'] assert [t.name for t in req.topics] == ['topic-a', 'topic-b'] assert req.response_partition_limit == 100 assert req.cursor is None - def test_request_encodes_cursor(self): - broker = MockBroker() + def test_request_encodes_cursor(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -426,13 +371,9 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(DescribeTopicPartitionsRequest, handler) - admin = _make_admin(broker) - try: - admin.describe_topic_partitions( - ['topic-a'], - cursor={'topic_name': 'topic-a', 'partition_index': 3}) - finally: - admin.close() + admin.describe_topic_partitions( + ['topic-a'], + cursor={'topic_name': 'topic-a', 'partition_index': 3}) cursor = captured['request'].cursor assert cursor is not None @@ -481,9 +422,7 @@ def _list_offsets_response(per_partition): class TestListPartitionOffsetsMockBroker: - - def test_returns_result_info(self): - broker = MockBroker() + def test_returns_result_info(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=2) broker.respond( ListOffsetsRequest, @@ -493,14 +432,10 @@ def test_returns_result_info(self): ]), ) - admin = _make_admin(broker) - try: - result = admin.list_partition_offsets({ - TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, - TopicPartition('topic-a', 1): OffsetSpec.LATEST, - }) - finally: - admin.close() + result = admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, + TopicPartition('topic-a', 1): OffsetSpec.LATEST, + }) assert result == { TopicPartition('topic-a', 0): OffsetAndTimestamp( @@ -509,8 +444,7 @@ def test_returns_result_info(self): offset=200, timestamp=5678, leader_epoch=7), } - def test_request_uses_spec_timestamp_sentinels(self): - broker = MockBroker() + def test_request_uses_spec_timestamp_sentinels(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=2) captured = {} @@ -525,14 +459,10 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_offsets({ - TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, - TopicPartition('topic-a', 1): OffsetSpec.LATEST, - }) - finally: - admin.close() + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.EARLIEST, + TopicPartition('topic-a', 1): OffsetSpec.LATEST, + }) req = captured['request'] assert req.replica_id == -1 @@ -541,8 +471,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): timestamps = {p.partition_index: p.timestamp for p in topic.partitions} assert timestamps == {0: -2, 1: -1} - def test_offset_timestamp_passed_through(self): - broker = MockBroker() + def test_offset_timestamp_passed_through(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) captured = {} @@ -553,20 +482,15 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) - admin = _make_admin(broker) - try: - result = admin.list_partition_offsets({ - TopicPartition('topic-a', 0): 1700000000, - }) - finally: - admin.close() + result = admin.list_partition_offsets({ + TopicPartition('topic-a', 0): 1700000000, + }) assert captured['request'].topics[0].partitions[0].timestamp == 1700000000 assert result[TopicPartition('topic-a', 0)].offset == 42 - def test_groups_partitions_by_leader(self): + def test_groups_partitions_by_leader(self, broker): # Two partitions, two different leaders => two requests. - broker = MockBroker() Broker = MetadataResponse.MetadataResponseBroker Topic = MetadataResponse.MetadataResponseTopic Partition = Topic.MetadataResponsePartition @@ -600,7 +524,12 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) broker.respond_fn(ListOffsetsRequest, handler) - admin = _make_admin(broker) + # cant use fixture because it bootstraps eagerly + admin = KafkaAdminClient( + kafka_client=broker.client_factory(), + bootstrap_servers='%s:%d' % (broker.host, broker.port), + request_timeout_ms=5000, + ) try: admin.list_partition_offsets({ TopicPartition('topic-a', 0): OffsetSpec.LATEST, @@ -614,8 +543,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): for req in captured: assert sum(len(t.partitions) for t in req.topics) == 1 - def test_partition_error_raises(self): - broker = MockBroker() + def test_partition_error_raises(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) broker.respond( ListOffsetsRequest, @@ -624,73 +552,44 @@ def test_partition_error_raises(self): ]), ) - admin = _make_admin(broker) - try: - with pytest.raises(UnknownTopicOrPartitionError): - admin.list_partition_offsets({ - TopicPartition('topic-a', 0): OffsetSpec.LATEST, - }) - finally: - admin.close() + with pytest.raises(UnknownTopicOrPartitionError): + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.LATEST, + }) - def test_unknown_partition_raises(self): - broker = MockBroker() + def test_unknown_partition_raises(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) - admin = _make_admin(broker) - try: - with pytest.raises(UnknownTopicOrPartitionError): - admin.list_partition_offsets({ - TopicPartition('topic-a', 99): OffsetSpec.LATEST, - }) - finally: - admin.close() + with pytest.raises(UnknownTopicOrPartitionError): + admin.list_partition_offsets({ + TopicPartition('topic-a', 99): OffsetSpec.LATEST, + }) - def test_max_timestamp_requires_v7(self): + @pytest.mark.parametrize("broker", [(2, 7)], indirect=True) + def test_max_timestamp_requires_v7(self, broker, admin): # (2, 7) broker -> ListOffsets max v6 -> MAX_TIMESTAMP unsupported - broker = MockBroker(broker_version=(2, 7)) _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) - admin = _make_admin(broker) - try: - with pytest.raises(IncompatibleBrokerVersion): - admin.list_partition_offsets({ - TopicPartition('topic-a', 0): OffsetSpec.MAX_TIMESTAMP, - }) - finally: - admin.close() + with pytest.raises(IncompatibleBrokerVersion): + admin.list_partition_offsets({ + TopicPartition('topic-a', 0): OffsetSpec.MAX_TIMESTAMP, + }) - def test_invalid_timestamp_raises(self): - broker = MockBroker() + def test_invalid_timestamp_raises(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) - admin = _make_admin(broker) - try: - with pytest.raises(ValueError): - admin.list_partition_offsets({TopicPartition('topic-a', 0): -100}, 0) - finally: - admin.close() + with pytest.raises(ValueError): + admin.list_partition_offsets({TopicPartition('topic-a', 0): -100}, 0) - def test_invalid_isolation_level_raises(self): - broker = MockBroker() + def test_invalid_isolation_level_raises(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) - admin = _make_admin(broker) - try: - with pytest.raises(ValueError): - admin.list_partition_offsets( - {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, - isolation_level='wat', - ) - finally: - admin.close() + with pytest.raises(ValueError): + admin.list_partition_offsets( + {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, + isolation_level='wat', + ) - def test_empty_input_is_noop(self): - broker = MockBroker() - admin = _make_admin(broker) - try: - assert admin.list_partition_offsets({}) == {} - finally: - admin.close() + def test_empty_input_is_noop(self, broker, admin): + assert admin.list_partition_offsets({}) == {} - def test_int_timestamp_accepted_directly(self): - broker = MockBroker() + def test_int_timestamp_accepted_directly(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) captured = {} @@ -701,16 +600,11 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_offsets({TopicPartition('topic-a', 0): -2}) - finally: - admin.close() + admin.list_partition_offsets({TopicPartition('topic-a', 0): -2}) assert captured['request'].topics[0].partitions[0].timestamp == -2 - def test_read_committed_uses_isolation_level_1(self): - broker = MockBroker() + def test_read_committed_uses_isolation_level_1(self, broker, admin): _set_metadata_for_topic(broker, 'topic-a', num_partitions=1) captured = {} @@ -721,13 +615,9 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(ListOffsetsRequest, handler) - admin = _make_admin(broker) - try: - admin.list_partition_offsets( - {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, - isolation_level='read_committed', - ) - finally: - admin.close() + admin.list_partition_offsets( + {TopicPartition('topic-a', 0): OffsetSpec.LATEST}, + isolation_level='read_committed', + ) assert captured['request'].isolation_level == 1 diff --git a/test/admin/test_admin_users.py b/test/admin/test_admin_users.py index 5e47f2359..f316f3c2b 100644 --- a/test/admin/test_admin_users.py +++ b/test/admin/test_admin_users.py @@ -87,19 +87,28 @@ def test_unknown_mechanism_rejected(self): # --------------------------------------------------------------------------- -def _make_admin(broker): - return KafkaAdminClient( +@pytest.fixture +def broker(request): + # parametrize tests with indirect=True + broker_version = getattr(request, 'param', (4, 2)) + return MockBroker(broker_version=broker_version) + + +@pytest.fixture +def admin(broker): + admin = KafkaAdminClient( kafka_client=broker.client_factory(), bootstrap_servers='%s:%d' % (broker.host, broker.port), - api_version=broker.broker_version, request_timeout_ms=5000, ) + try: + yield admin + finally: + admin.close() class TestAlterUserScramCredentialsMockBroker: - - def test_all_success_returns_none_values(self): - broker = MockBroker() + def test_all_success_returns_none_values(self, broker, admin): Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult broker.respond( AlterUserScramCredentialsRequest, @@ -111,22 +120,15 @@ def test_all_success_returns_none_values(self): ], ), ) - - admin = _make_admin(broker) - try: - result = admin.alter_user_scram_credentials([ - UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), - UserScramCredentialUpsertion( - 'bob', ScramMechanism.SCRAM_SHA_512, - password='secret', iterations=4096, salt=b'fixed-salt'), - ]) - finally: - admin.close() - + result = admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + UserScramCredentialUpsertion( + 'bob', ScramMechanism.SCRAM_SHA_512, + password='secret', iterations=4096, salt=b'fixed-salt'), + ]) assert result == {'alice': None, 'bob': None} - def test_partial_errors_returned_in_dict(self): - broker = MockBroker() + def test_partial_errors_returned_in_dict(self, broker, admin): Result = AlterUserScramCredentialsResponse.AlterUserScramCredentialsResult broker.respond( AlterUserScramCredentialsRequest, @@ -139,23 +141,16 @@ def test_partial_errors_returned_in_dict(self): ], ), ) - - admin = _make_admin(broker) - try: - result = admin.alter_user_scram_credentials([ - UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), - UserScramCredentialDeletion('bob', ScramMechanism.SCRAM_SHA_512), - ]) - finally: - admin.close() - + result = admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + UserScramCredentialDeletion('bob', ScramMechanism.SCRAM_SHA_512), + ]) assert result == { 'alice': None, 'bob': 'Unsupported SASL mechanism', } - def test_request_is_encoded_with_deletions_and_upsertions(self): - broker = MockBroker() + def test_request_is_encoded_with_deletions_and_upsertions(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -178,14 +173,10 @@ def handler(api_key, api_version, correlation_id, request_bytes): 'bob', ScramMechanism.SCRAM_SHA_512, password='secret', iterations=2048, salt=salt) - admin = _make_admin(broker) - try: - admin.alter_user_scram_credentials([ - UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), - upsertion, - ]) - finally: - admin.close() + admin.alter_user_scram_credentials([ + UserScramCredentialDeletion('alice', ScramMechanism.SCRAM_SHA_256), + upsertion, + ]) request = captured['request'] assert len(request.deletions) == 1 @@ -203,9 +194,7 @@ def handler(api_key, api_version, correlation_id, request_bytes): class TestDescribeUserScramCredentialsMockBroker: - - def test_returns_credentials_per_user(self): - broker = MockBroker() + def test_returns_credentials_per_user(self, broker, admin): Result = DescribeUserScramCredentialsResponse.DescribeUserScramCredentialsResult CI = Result.CredentialInfo broker.respond( @@ -226,12 +215,7 @@ def test_returns_credentials_per_user(self): ), ) - admin = _make_admin(broker) - try: - result = admin.describe_user_scram_credentials(['alice', 'bob']) - finally: - admin.close() - + result = admin.describe_user_scram_credentials(['alice', 'bob']) assert result == { 'alice': { 'error': None, @@ -248,8 +232,7 @@ def test_returns_credentials_per_user(self): }, } - def test_per_user_error_reported(self): - broker = MockBroker() + def test_per_user_error_reported(self, broker, admin): Result = DescribeUserScramCredentialsResponse.DescribeUserScramCredentialsResult broker.respond( DescribeUserScramCredentialsRequest, @@ -265,12 +248,7 @@ def test_per_user_error_reported(self): ), ) - admin = _make_admin(broker) - try: - result = admin.describe_user_scram_credentials(['missing']) - finally: - admin.close() - + result = admin.describe_user_scram_credentials(['missing']) assert result == { 'missing': { 'error': 'resource not found', @@ -278,8 +256,7 @@ def test_per_user_error_reported(self): }, } - def test_top_level_error_raises(self): - broker = MockBroker() + def test_top_level_error_raises(self, broker, admin): broker.respond( DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse( @@ -290,16 +267,11 @@ def test_top_level_error_raises(self): ), ) - admin = _make_admin(broker) - try: - with pytest.raises(Exception) as exc_info: - admin.describe_user_scram_credentials(['alice']) - assert 'SCRAM not configured' in str(exc_info.value) - finally: - admin.close() + with pytest.raises(Exception) as exc_info: + admin.describe_user_scram_credentials(['alice']) + assert 'SCRAM not configured' in str(exc_info.value) - def test_describe_all_users_sends_null(self): - broker = MockBroker() + def test_describe_all_users_sends_null(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -315,17 +287,11 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(DescribeUserScramCredentialsRequest, handler) - admin = _make_admin(broker) - try: - result = admin.describe_user_scram_credentials() - finally: - admin.close() - + result = admin.describe_user_scram_credentials() assert result == {} assert captured['request'].users is None - def test_describe_specific_users_encodes_names(self): - broker = MockBroker() + def test_describe_specific_users_encodes_names(self, broker, admin): captured = {} def handler(api_key, api_version, correlation_id, request_bytes): @@ -341,11 +307,6 @@ def handler(api_key, api_version, correlation_id, request_bytes): broker.respond_fn(DescribeUserScramCredentialsRequest, handler) - admin = _make_admin(broker) - try: - admin.describe_user_scram_credentials(['alice', 'bob']) - finally: - admin.close() - + admin.describe_user_scram_credentials(['alice', 'bob']) request_users = captured['request'].users assert [u.name for u in request_users] == ['alice', 'bob'] diff --git a/test/mock_broker.py b/test/mock_broker.py index d44633ccc..5a26a66c3 100644 --- a/test/mock_broker.py +++ b/test/mock_broker.py @@ -182,8 +182,18 @@ def __init__(self, node_id=0, host='localhost', port=9092, broker_version=(4, 2) self.node_id = node_id self.host = host self.port = port - self.broker_version = broker_version + self.set_broker_version(broker_version) + self.set_metadata() # Default metadata: just this broker, no topics + + # Scripted response queue: list of (api_key, response_object) pairs + self._response_queue = collections.deque() + # Counters for debugging + self.requests_received = 0 + self.responses_sent = 0 + + def set_broker_version(self, broker_version): + self.broker_version = broker_version # Build the auto-response for ApiVersionsRequest self._broker_version_data = BrokerVersionData(broker_version) ApiVersion = ApiVersionsResponse.ApiVersion @@ -199,24 +209,6 @@ def __init__(self, node_id=0, host='localhost', port=9092, broker_version=(4, 2) av_range = self._broker_version_data.api_versions.get(ApiVersionsRequest.API_KEY, (0, 0)) self._api_versions_max = av_range[1] - # Default metadata: just this broker, no topics - Broker = MetadataResponse.MetadataResponseBroker - self._metadata_response = MetadataResponse( - version=min(8, MetadataResponse.max_version), - throttle_time_ms=0, - brokers=[Broker(node_id=node_id, host=host, port=port, rack=None)], - cluster_id='mock-cluster', - controller_id=node_id, - topics=[], - ) - - # Scripted response queue: list of (api_key, response_object) pairs - self._response_queue = collections.deque() - - # Counters for debugging - self.requests_received = 0 - self.responses_sent = 0 - def set_metadata(self, topics=None, brokers=None): """Configure the auto-response for MetadataRequest. @@ -229,7 +221,7 @@ def set_metadata(self, topics=None, brokers=None): if brokers is None: brokers = [Broker(node_id=self.node_id, host=self.host, port=self.port, rack=None)] self._metadata_response = MetadataResponse( - version=min(8, MetadataResponse.max_version), + version=self._broker_version_data.api_version(MetadataRequest, max_version=8), throttle_time_ms=0, brokers=brokers, cluster_id='mock-cluster', From ef99b1c63aa4ae327ee1e69e3cab8a1b160ff81f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 10:52:52 -0700 Subject: [PATCH 2/2] fixup --- test/admin/test_admin_configs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/admin/test_admin_configs.py b/test/admin/test_admin_configs.py index aa4e3222b..39a88b58f 100644 --- a/test/admin/test_admin_configs.py +++ b/test/admin/test_admin_configs.py @@ -10,7 +10,7 @@ ListConfigResourcesRequest, ListConfigResourcesResponse, ) -from test.broker import MockBroker +from test.mock_broker import MockBroker # ConfigResourceType values (wire)