Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tests/kafkatest/sanity_checks/test_console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ def setUp(self):
self.zk.start()

@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to also update test_console_producer.py?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I thought it is useful to have one sanity test with SASL/PLAIN that is simple to run, hence added a consumer test. Since the replication tests run both producer and consumer anyway, perhaps this is sufficient? test_verifiable_producer.py is currently run only with PLAINTEXT.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me.

@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, new_consumer=True):
def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'):
"""Check that console consumer starts/stops properly, and that we are capturing log output."""

self.kafka.security_protocol = security_protocol
self.kafka.client_sasl_mechanism = sasl_mechanism
self.kafka.interbroker_sasl_mechanism = sasl_mechanism
self.kafka.start()

self.consumer.security_protocol = security_protocol
Expand Down
14 changes: 8 additions & 6 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class KafkaService(JmxMixin, Service):
}

def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
jmx_attributes=[], zk_connect_timeout=5000):
"""
:type context
Expand All @@ -78,7 +79,8 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI

self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.sasl_mechanism = sasl_mechanism
self.client_sasl_mechanism = client_sasl_mechanism
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self.topics = topics
self.minikdc = None
self.authorizer_class_name = authorizer_class_name
Expand Down Expand Up @@ -108,7 +110,9 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI

@property
def security_config(self):
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, zk_sasl = self.zk.zk_sasl , sasl_mechanism=self.sasl_mechanism)
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol,
zk_sasl = self.zk.zk_sasl,
client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)

def open_port(self, protocol):
self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
Expand Down Expand Up @@ -163,9 +167,7 @@ def prop_file(self, node):
# TODO - clean up duplicate configuration logic
prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config,
interbroker_security_protocol=self.interbroker_security_protocol,
sasl_mechanism=self.sasl_mechanism)
security_config=self.security_config)
return prop_file

def start_cmd(self, node):
Expand Down
5 changes: 3 additions & 2 deletions tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_p
quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
{% endif %}

security.inter.broker.protocol={{ interbroker_security_protocol }}
security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}

ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd
Expand All @@ -59,7 +59,8 @@ ssl.keystore.type=JKS
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
sasl.mechanism={{ sasl_mechanism }}
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
{% if authorizer_class_name is not none %}
ssl.client.auth=required
Expand Down
27 changes: 20 additions & 7 deletions tests/kafkatest/services/security/security_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class SecurityConfig(TemplateRenderer):

ssl_stores = Keytool.generate_keystore_truststore('.')

def __init__(self, security_protocol=None, interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, zk_sasl=False, template_props=""):
def __init__(self, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props=""):
"""
Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol
Expand Down Expand Up @@ -104,13 +106,14 @@ def __init__(self, security_protocol=None, interbroker_security_protocol=None, s
'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
'sasl.mechanism' : sasl_mechanism,
'sasl.mechanism' : client_sasl_mechanism,
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
}


def client_config(self, template_props=""):
return SecurityConfig(self.security_protocol, sasl_mechanism=self.sasl_mechanism, template_props=template_props)
return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)

def setup_node(self, node):
if self.has_ssl:
Expand All @@ -120,13 +123,15 @@ def setup_node(self, node):

if self.has_sasl:
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf"
jaas_conf_file = "jaas.conf"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this changed? I'm not sure its an issue, but it seems like we may have named the file with the SASL mechanism previously to at least aid in debugging.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp Thank you for the review. Since we now support multiple SASL mechanisms in a broker, we need to have a jaas config file that includes all the supported mechanisms. Hence I modified jaas.conf to include sections based on the enabled mechanisms, rather than have one config file per mechanism. I removed the mechanism name from the file since it is no longer associated with a single mechanism.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense.

java_version = node.account.ssh_capture("java -version")
if any('IBM' in line for line in java_version):
is_ibm_jdk = True
else:
is_ibm_jdk = False
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk)
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk,
client_sasl_mechanism=self.client_sasl_mechanism,
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
if self.has_sasl_kerberos:
node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
Expand Down Expand Up @@ -159,12 +164,20 @@ def security_protocol(self):
return self.properties['security.protocol']

@property
def sasl_mechanism(self):
def client_sasl_mechanism(self):
return self.properties['sasl.mechanism']

@property
def interbroker_sasl_mechanism(self):
return self.properties['sasl.mechanism.inter.broker.protocol']

@property
def enabled_sasl_mechanisms(self):
return set([self.client_sasl_mechanism, self.interbroker_sasl_mechanism])

@property
def has_sasl_kerberos(self):
return self.has_sasl and self.sasl_mechanism == SecurityConfig.SASL_MECHANISM_GSSAPI
return self.has_sasl and (SecurityConfig.SASL_MECHANISM_GSSAPI in self.enabled_sasl_mechanisms)

@property
def kafka_opts(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,76 +11,85 @@
* specific language governing permissions and limitations under the License.
*/

{% if is_ibm_jdk %}

KafkaClient {
{% if client_sasl_mechanism == "GSSAPI" %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="client@EXAMPLE.COM";
};

KafkaServer {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% if zk_sasl %}
Client {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="zkclient@EXAMPLE.COM";
};

Server {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeyTab="file:/mnt/security/keytab"
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% endif %}
{% else %}

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="client@EXAMPLE.COM";
{% endif %}
{% elif client_sasl_mechanism == "PLAIN" %}
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client"
password="client-secret";
{% endif %}

};

KafkaServer {
{% if "GSSAPI" in enabled_sasl_mechanisms %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
{% endif %}
{% endif %}
{% if "PLAIN" in enabled_sasl_mechanisms %}
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-secret"
user_client="client-secret"
user_kafka="kafka-secret";
{% endif %}
};

{% if zk_sasl %}
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zkclient@EXAMPLE.COM";
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="zkclient@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zkclient@EXAMPLE.COM";
{% endif %}
};

Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeyTab="file:/mnt/security/keytab"
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
{% endif %}
};
{% endif %}



7 changes: 6 additions & 1 deletion tests/kafkatest/tests/core/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ def min_cluster_size(self):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["controller"],
security_protocol=["PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
@matrix(failure_mode=["hard_bounce"],
broker_type=["leader"],
security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need GSSAPI for interbroker_sasl_mechanism? Don't we already test that via the other matrix annotation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma This tests (client=PLAIN, interbroker=PLAIN) and (client=PLAIN, interbroker=GSSAPI). The first tests a broker with just PLAIN and the second tests a broker with multiple mechanisms.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these tests take a while to run, maybe we should just have the multiple mechanisms case. What do you think @ewencp?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma The second case (multiple mechanisms) uses GSSAPI for interbroker communication. That leaves no tests with PLAIN for inter-broker (sanity test runs only one broker, so that doesn't test as much as this one).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Do we need to cover all the cases here, though? Replication tests are pretty heavyweight. If we just want sanity checks on different protocol settings, we could also try to get coverage via other, cheaper tests?

I'm fine with the additional tests if we need them to get coverage, I just want to make sure we don't keep increasing the number of variants of expensive tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp I dont actually know the answer. I chose one sanity test with a single broker (consumer test) and the replication tests because they restart brokers making them useful for testing authentication. I chose two types (PLAIN, multi-mechanism) and two types of tests (clean_bounce, hard_shutdown), that is 4 tests in all. I could reduce to just hard_shutdown (2 tests) or just SASL/PLAIN with hard_shutdown (1 test). Or just do upgrade tests since they do restart as well. Suggestions welcome :-)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp Reduced to running just hard_bounce. Two runs, once with SASL/PLAIN and once with multiple mechanisms.

def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios.
Expand All @@ -144,6 +147,8 @@ def test_replication_with_broker_failure(self, failure_mode, security_protocol,

self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.client_sasl_mechanism = client_sasl_mechanism
self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
Expand Down