From 354ff3eb27fdd9585a765683003e35dd1c29de2e Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Thu, 30 Oct 2025 11:20:04 -0400 Subject: [PATCH 1/4] test_infopipe, standardizing the provider amongst all tests (cherry picked from commit 0cae6821bd70ae5d4d73b424549c7e236cd312cf) --- src/tests/system/tests/test_infopipe.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/tests/system/tests/test_infopipe.py b/src/tests/system/tests/test_infopipe.py index 0830fb0a99c..18a71ef0ce8 100644 --- a/src/tests/system/tests/test_infopipe.py +++ b/src/tests/system/tests/test_infopipe.py @@ -9,8 +9,7 @@ import pytest from sssd_test_framework.roles.client import Client from sssd_test_framework.roles.generic import GenericProvider -from sssd_test_framework.roles.ldap import LDAP -from sssd_test_framework.topology import KnownTopology, KnownTopologyGroup +from sssd_test_framework.topology import KnownTopology @pytest.mark.topology(KnownTopology.LDAP) @@ -105,7 +104,6 @@ def test_infopipe__get_domain_properties(client: Client): @pytest.mark.ticket(gh=6020, bz=2128840, jira="SSSD-5054") -@pytest.mark.topology(KnownTopology.IPA) @pytest.mark.topology(KnownTopology.LDAP) def test_infopipe__list_by_attr(client: Client, provider: GenericProvider): """ @@ -193,7 +191,6 @@ def test_infopipe__list_by_attr(client: Client, provider: GenericProvider): @pytest.mark.ticket(gh=[6360, 6361], jira="SSSD-5054") -@pytest.mark.topology(KnownTopology.IPA) @pytest.mark.topology(KnownTopology.LDAP) def test_infopipe__list_by_name(client: Client, provider: GenericProvider): """ @@ -253,7 +250,7 @@ def test_infopipe__list_by_name(client: Client, provider: GenericProvider): @pytest.mark.importance("medium") @pytest.mark.ticket(bz=1667252) -@pytest.mark.topology(KnownTopologyGroup.AnyProvider) +@pytest.mark.topology(KnownTopology.LDAP) def test_infopipe__lookup_user_with_extra_attributes(client: Client, provider: GenericProvider): """ :title: Infopipe does not crash looking up extra attribute From 95e09832d30175bf9bba8d35859486a8626f7646 Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Thu, 30 Oct 2025 21:07:20 -0400 Subject: [PATCH 2/4] updating some test logic and adding test cases (cherry picked from commit f2ccc6e5fb7f5f734a1654d38e178bf680a1c991) --- src/tests/system/tests/test_infopipe.py | 50 +++++++++++++------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/tests/system/tests/test_infopipe.py b/src/tests/system/tests/test_infopipe.py index 18a71ef0ce8..7987fb33f93 100644 --- a/src/tests/system/tests/test_infopipe.py +++ b/src/tests/system/tests/test_infopipe.py @@ -9,6 +9,7 @@ import pytest from sssd_test_framework.roles.client import Client from sssd_test_framework.roles.generic import GenericProvider +from sssd_test_framework.roles.ldap import LDAP from sssd_test_framework.topology import KnownTopology @@ -400,18 +401,20 @@ def test_infopipe__lookup_user_attributes(client: Client, provider: LDAP): users = client.ifp.getObject("/org/freedesktop/sssd/infopipe/Users") user_path = users.FindByName("user1") user = client.ifp.getObject(user_path) - props = user.GetAll("org.freedesktop.sssd.infopipe.Users.User") - user_attrs = props["extraAttributes"] - assert "sn" in user_attrs and user_attrs["sn"] == ["Test"], "Expected sn to be ['Test']" - assert "givenName" in user_attrs and user_attrs["givenName"] == ["User"], "Expected givenName to be ['User']" - assert "mail" in user_attrs and user_attrs["mail"] == [ - "user1@example.com" - ], "Expected mail to be ['user1@example.com']" + results = user.GetAll("org.freedesktop.sssd.infopipe.Users.User") + user_attrs = results["extraAttributes"] + + for i in [ + ("sn", "Test"), + ("givenName", "User"), + ("mail", "user1@example.com"), + ]: + assert i[0] in user_attrs and user_attrs[i[0]] == [i[1]], f"Expected {i[0]} to be {i[1]}" @pytest.mark.importance("medium") @pytest.mark.topology(KnownTopology.LDAP) -def test_infopipe__ping_interface(client: Client): +def test_infopipe__ping(client: Client): """ :title: Call the infopipe ping method :setup: @@ -425,8 +428,9 @@ def test_infopipe__ping_interface(client: Client): client.sssd.start() ifp = client.ifp.getObject("/org/freedesktop/sssd/infopipe") - result = ifp.Ping("ping") - assert result == "PONG", "Ping() did not return expected value" + + for i in ["ping", "PinG", "PING"]: + assert ifp.Ping(i) == "PONG", "Ping() did not return expected value" @pytest.mark.importance("medium") @@ -451,15 +455,13 @@ def test_infopipe__list_components(client: Client): assert len(components) > 0, "No components returned" component = client.ifp.getObject(components[0]) - props = {} - properties_to_fetch = ["name", "type", "enabled", "debug_level"] - for prop in properties_to_fetch: - props[prop] = component.Get("org.freedesktop.sssd.infopipe.Components", prop) + results = {} + properties = ["name", "type", "enabled", "debug_level"] + for prop in properties: + results[prop] = component.Get("org.freedesktop.sssd.infopipe.Components", prop) - assert "name" in props, "Component missing name property" - assert "type" in props, "Component missing type property" - assert "enabled" in props, "Component missing enabled property" - assert "debug_level" in props, "Component missing debug_level property" + for i in properties: + assert i in results, f"Component missing {i} property" @pytest.mark.importance("medium") @@ -486,13 +488,13 @@ def test_infopipe__find_monitor(client: Client): ), f"Monitor path '{monitor_path}' should start with '/org/freedesktop/sssd/infopipe/Components/'" monitor = client.ifp.getObject(monitor_path) - props = {} - for prop in ["name", "type", "enabled"]: - props[prop] = monitor.Get("org.freedesktop.sssd.infopipe.Components", prop) + results = {} + for properties in ["name", "type", "enabled"]: + results[properties] = monitor.Get("org.freedesktop.sssd.infopipe.Components", properties) - assert props["name"] == "monitor", f"Expected monitor name 'monitor', got '{props['name']}'" - assert props["type"] == "monitor", f"Expected monitor type 'monitor', got '{props['type']}'" - assert props["enabled"] is True, f"Expected monitor to be enabled (True), got '{props['enabled']}'" + assert results["name"] == "monitor", f"Expected monitor name 'monitor', got '{results['name']}'" + assert results["type"] == "monitor", f"Expected monitor type 'monitor', got '{results['type']}'" + assert results["enabled"] is True, f"Expected monitor to be enabled (True), got '{results['enabled']}'" @pytest.mark.importance("medium") From 1827ef02e42f1be0c49c06f7de1045bb58e7f5bc Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Mon, 10 Nov 2025 16:01:15 -0500 Subject: [PATCH 3/4] removing intg ifp tests (cherry picked from commit a276441fe48d665baf80a0b742186870f4c2c4eb) --- src/tests/intg/Makefile.am | 2 - src/tests/intg/test_infopipe.py | 815 ------------------------- src/tests/intg/test_pysss_nss_idmap.py | 361 ----------- 3 files changed, 1178 deletions(-) delete mode 100644 src/tests/intg/test_infopipe.py delete mode 100644 src/tests/intg/test_pysss_nss_idmap.py diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index 828433e989e..29e2a65f3cd 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -30,8 +30,6 @@ dist_noinst_DATA = \ data/cert_schema.ldif \ data/ssh_schema.ldif \ data/sudo_schema.ldif \ - test_pysss_nss_idmap.py \ - test_infopipe.py \ test_pam_responder.py \ conftest.py \ sssd_hosts.py \ diff --git a/src/tests/intg/test_infopipe.py b/src/tests/intg/test_infopipe.py deleted file mode 100644 index ae925858fc0..00000000000 --- a/src/tests/intg/test_infopipe.py +++ /dev/null @@ -1,815 +0,0 @@ -# -# Infopipe integration test -# -# Copyright (c) 2017 Red Hat, Inc. -# Author: Lukas Slebodnik -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# - -from __future__ import print_function - -import os -import stat -import pwd -import signal -import subprocess -import errno -import time -import ldap -import ldap.modlist -import pytest -import dbus -import shutil - -import config -import ds_openldap -import ldap_ent -from util import unindent, get_call_output - -LDAP_BASE_DN = "dc=example,dc=com" -INTERACTIVE_TIMEOUT = 4 - - -class DbusDaemon(object): - def __init__(self): - self.pid = 0 - - def start(self): - """Start the SSSD process""" - assert self.pid == 0 - - dbus_config_path = config.SYSCONFDIR + "/dbus-1/cwrap-dbus-system.conf" - dbus_commands = [ - ["dbus-daemon", "--config-file", dbus_config_path, - "--nosyslog", "--fork"], - ["dbus-daemon", "--config-file", dbus_config_path, "--fork"], - ] - dbus_started = False - for dbus_command in dbus_commands: - try: - if subprocess.call(dbus_command) == 0: - dbus_started = True - break - else: - print("start failed for %s" % " ".join(dbus_command)) - except OSError as ex: - if ex.errno == errno.ENOENT: - print("%s does not exist" % (dbus_command[0])) - pass - - if not dbus_started: - raise Exception("dbus-daemon start failed") - dbus_pid_path = config.RUNSTATEDIR + "/dbus/messagebus.pid" - # wait 10 seconds for pidfile - wait_time = 10 - for _ in range(wait_time * 10): - if os.path.isfile(dbus_pid_path): - break - time.sleep(.1) - - assert os.path.isfile(dbus_pid_path) - with open(dbus_pid_path, "r") as pid_file: - self.pid = int(pid_file.read()) - - def stop(self): - """Stop the SSSD process and remove its state""" - - # stop process only if running - if self.pid != 0: - try: - os.kill(self.pid, signal.SIGTERM) - while True: - try: - os.kill(self.pid, signal.SIGCONT) - except OSError: - break - time.sleep(.1) - except OSError: - pass - - # clean pid so we can start service one more time - self.pid = 0 - - # dbus-daemon 1.2.24 does not clean pid file after itself - try: - os.unlink(config.RUNSTATEDIR + "/dbus/messagebus.pid") - except OSError as ex: - if ex.errno != errno.ENOENT: - raise - - -@pytest.fixture(scope="module") -def dbus_system_bus(request): - dbus_daemon = DbusDaemon() - dbus_daemon.start() - - def cleanup_dbus_process(): - dbus_daemon.stop() - request.addfinalizer(cleanup_dbus_process) - - return dbus.SystemBus() - - -@pytest.fixture(scope="module") -def ds_inst(request): - """LDAP server instance fixture""" - ds_inst = ds_openldap.DSOpenLDAP( - config.PREFIX, 10389, LDAP_BASE_DN, - "cn=admin", "Secret123" - ) - - try: - ds_inst.setup() - except Exception: - ds_inst.teardown() - raise - request.addfinalizer(ds_inst.teardown) - - return ds_inst - - -@pytest.fixture(scope="module") -def ldap_conn(request, ds_inst): - """LDAP server connection fixture""" - ldap_conn = ds_inst.bind() - ldap_conn.ds_inst = ds_inst - request.addfinalizer(ldap_conn.unbind_s) - return ldap_conn - - -def create_ldap_entries(ldap_conn, ent_list=None): - """Add LDAP entries from ent_list""" - if ent_list is not None: - for entry in ent_list: - ldap_conn.add_s(entry[0], entry[1]) - - -def cleanup_ldap_entries(ldap_conn, ent_list=None): - """Remove LDAP entries added by create_ldap_entries""" - if ent_list is None: - for ou in ("Users", "Groups", "Netgroups", "Services", "Policies"): - for entry in ldap_conn.search_s(f"ou={ou}," - f"{ldap_conn.ds_inst.base_dn}", - ldap.SCOPE_ONELEVEL, - attrlist=[]): - ldap_conn.delete_s(entry[0]) - else: - for entry in ent_list: - ldap_conn.delete_s(entry[0]) - - -def create_ldap_cleanup(request, ldap_conn, ent_list=None): - """Add teardown for removing all user/group LDAP entries""" - request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) - - -def create_ldap_fixture(request, ldap_conn, ent_list=None): - """Add LDAP entries and add teardown for removing them""" - create_ldap_entries(ldap_conn, ent_list) - create_ldap_cleanup(request, ldap_conn, ent_list) - - -SCHEMA_RFC2307 = "rfc2307" -SCHEMA_RFC2307_BIS = "rfc2307bis" - - -def format_basic_conf(ldap_conn, schema, config): - """Format a basic SSSD configuration""" - schema_conf = "ldap_schema = " + schema + "\n" - if schema == SCHEMA_RFC2307_BIS: - schema_conf += "ldap_group_object_class = groupOfNames\n" - - return unindent("""\ - [sssd] - debug_level = 0xffff - domains = LDAP, app - services = nss, ifp - enable_files_domain = false - - [nss] - memcache_timeout = 0 - - [ifp] - debug_level = 0xffff - user_attributes = +extraName - ca_db = {config.PAM_CERT_DB_PATH} - - [domain/LDAP] - {schema_conf} - id_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - ldap_id_use_start_tls = false - ldap_user_extra_attrs = extraName:uid - ldap_user_certificate = userCert - - [application/app] - inherit_from = LDAP - """).format(**locals()) - - -def format_certificate_conf(ldap_conn, schema, config): - """Format an SSSD configuration with all caches refreshing in 4 seconds""" - return \ - format_basic_conf(ldap_conn, schema, config) + \ - unindent(""" - [certmap/LDAP/user1] - matchrule = .*CN = SSSD test cert 0001.* - """).format(**locals()) - - -def create_conf_file(contents): - """Create sssd.conf with specified contents""" - with open(config.CONF_PATH, "w") as conf: - conf.write(contents) - os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) - - -def cleanup_conf_file(): - """Remove sssd.conf, if it exists""" - if os.path.lexists(config.CONF_PATH): - os.unlink(config.CONF_PATH) - - -def create_conf_cleanup(request): - """Add teardown for removing sssd.conf""" - request.addfinalizer(cleanup_conf_file) - - -def create_conf_fixture(request, contents): - """ - Create sssd.conf with specified contents and add teardown for removing it - """ - create_conf_file(contents) - create_conf_cleanup(request) - - -def create_sssd_process(): - """Start the SSSD process""" - if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: - raise Exception("sssd start failed") - - -def cleanup_sssd_process(): - """Stop the SSSD process and remove its state""" - try: - with open(config.PIDFILE_PATH, "r") as pid_file: - pid = int(pid_file.read()) - os.kill(pid, signal.SIGTERM) - while True: - try: - os.kill(pid, signal.SIGCONT) - except OSError: - break - time.sleep(1) - except OSError: - pass - for path in os.listdir(config.DB_PATH): - os.unlink(config.DB_PATH + "/" + path) - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - - -def create_sssd_cleanup(request): - """Add teardown for stopping SSSD and removing its state""" - request.addfinalizer(cleanup_sssd_process) - - -def create_sssd_fixture(request): - """Start SSSD and add teardown for stopping it and removing its state""" - create_sssd_process() - create_sssd_cleanup(request) - - -def backup_ca_db(): - """Create backup file for ca db""" - src = os.path.dirname(config.PAM_CERT_DB_PATH) + "/SSSD_test_CA.pem" - dst = os.path.dirname(config.PAM_CERT_DB_PATH) + "/SSSD_test_CA.pem.bp" - shutil.copyfile(src, dst) - - -def restore_ca_db(): - """Restore backup file for ca db""" - src = os.path.dirname(config.PAM_CERT_DB_PATH) + "/SSSD_test_CA.pem.bp" - dst = os.path.dirname(config.PAM_CERT_DB_PATH) + "/SSSD_test_CA.pem" - shutil.copyfile(src, dst) - os.remove(src) - - -def create_restore_ca_db(request): - """Add teardown for restoring ca_db""" - request.addfinalizer(restore_ca_db) - - -def create_ca_db_fixture(request): - """ - Create backup for ca_db and add teardown for restoring it - """ - backup_ca_db() - create_restore_ca_db(request) - - -@pytest.fixture -def sanity_rfc2307(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user("user1", 1001, 2001) - ent_list.add_user("user2", 1002, 2002) - ent_list.add_user("user3", 1003, 2003) - - ent_list.add_group("group1", 2001) - ent_list.add_group("group2", 2002) - ent_list.add_group("group3", 2003) - - ent_list.add_group("empty_group", 2010) - - ent_list.add_group("single_user_group", 2011, ["user1"]) - ent_list.add_group("two_user_group", 2012, ["user1", "user2"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - - config.PAM_CERT_DB_PATH = os.environ['PAM_CERT_DB_PATH'] - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307, config) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - create_ca_db_fixture(request) - return None - - -@pytest.fixture -def simple_rfc2307(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user('usr\\\\001', 181818, 181818) - ent_list.add_group("group1", 181818) - create_ldap_fixture(request, ldap_conn, ent_list) - config.PAM_CERT_DB_PATH = os.environ['PAM_CERT_DB_PATH'] - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307, config) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - create_ca_db_fixture(request) - return None - - -@pytest.fixture -def auto_private_groups_rfc2307(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user("user1", 1001, 2001) - - ent_list.add_group("group1", 2001) - ent_list.add_group("single_user_group", 2011, ["user1"]) - ent_list.add_group("two_user_group", 2012, ["user1"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - - config.PAM_CERT_DB_PATH = os.environ['PAM_CERT_DB_PATH'] - conf = \ - format_basic_conf(ldap_conn, SCHEMA_RFC2307, config) + \ - unindent(""" - [domain/LDAP] - auto_private_groups = True - """).format(**locals()) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - create_ca_db_fixture(request) - return None - - -@pytest.fixture -def add_user_with_cert(request, ldap_conn): - config.PAM_CERT_DB_PATH = os.environ['PAM_CERT_DB_PATH'] - - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - ent_list.add_user("user1", 1001, 2001) - - create_ldap_fixture(request, ldap_conn, ent_list) - - der_path = os.path.dirname(config.PAM_CERT_DB_PATH) - der_path += "/SSSD_test_cert_x509_0001.der" - with open(der_path, 'rb') as f: - val = f.read() - dn = "uid=user1,ou=Users," + LDAP_BASE_DN - ''' - Using 'userCert' instead of 'userCertificate' to hold the user certificate - because the default OpenLDAP has syntax and matching rules which are not - used in other LDAP servers. - ''' - ldap_conn.modify_s(dn, [(ldap.MOD_ADD, 'userCert', val)]) - - conf = format_certificate_conf(ldap_conn, SCHEMA_RFC2307_BIS, config) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - create_ca_db_fixture(request) - - return None - - -def test_ping_raw(dbus_system_bus, ldap_conn, simple_rfc2307): - # test with disabled introspection - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe', - introspect=False) - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - # test missing parameter - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.Ping() - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'sbus.Error.Errno' - assert 'Unexpected argument type provided' in ex.get_dbus_message() - - # test wrong parameter type - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.Ping(1) - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'sbus.Error.Errno' - assert 'Unexpected argument type provided' in ex.get_dbus_message() - - # test wrong parameter value - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.Ping('test') - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'org.freedesktop.DBus.Error.InvalidArgs' - assert ex.get_dbus_message() == 'Invalid argument' - - # positive test - ret = sssd_interface.Ping('ping') - assert ret == "PONG" - - # test case insensitive input - ret = sssd_interface.Ping('PinG') - assert ret == "PONG" - - ret = sssd_interface.Ping('PING') - assert ret == "PONG" - - -def test_ping_introspection(dbus_system_bus, ldap_conn, simple_rfc2307): - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe') - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - # test missing parameter - with pytest.raises(TypeError) as exc_info: - sssd_interface.Ping() - assert exc_info.errisinstance(TypeError) - - ex = exc_info.value - assert str(ex) == 'More items found in D-Bus signature than in Python ' \ - 'arguments' - - # test wrong parameter type - with pytest.raises(TypeError) as exc_info: - sssd_interface.Ping(1) - assert exc_info.errisinstance(TypeError) - - ex = exc_info.value - assert str(ex) == 'Expected a string or unicode object' - - # test wrong parameter value - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.Ping('test') - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'org.freedesktop.DBus.Error.InvalidArgs' - assert ex.get_dbus_message() == 'Invalid argument' - - # positive test - ret = sssd_interface.Ping('ping') - assert ret == "PONG" - - # test case insensitive input - ret = sssd_interface.Ping('PinG') - assert ret == "PONG" - - ret = sssd_interface.Ping('PING') - assert ret == "PONG" - - -def test_special_characters(dbus_system_bus, ldap_conn, simple_rfc2307): - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe') - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - attributes = ['name', 'uidNumber', 'gidNumber', 'gecos', 'homeDirectory', - 'loginShell'] - expected = dict(name='usr\\001', uidNumber='181818', gidNumber='181818', - gecos='181818', homeDirectory='/home/usr\\\\001', - loginShell='/bin/bash') - - user_attrs = sssd_interface.GetUserAttr('usr\\001', attributes) - assert user_attrs.signature == 'sv' - assert user_attrs.variant_level == 0 - - assert len(attributes) == len(user_attrs) - assert sorted(attributes) == sorted(user_attrs.keys()) - - # check values of attributes - for attr in user_attrs: - assert user_attrs[attr].signature == 's' - assert user_attrs[attr].variant_level == 1 - assert user_attrs[attr][0] == expected[attr] - - -def test_get_user_attr(dbus_system_bus, ldap_conn, sanity_rfc2307): - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe') - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - # negative test - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.GetUserAttr('non_existent_user', ['name']) - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'sbus.Error.NotFound' - assert ex.get_dbus_message() == 'No such file or directory' - - # test 0 attributes - user_attrs = sssd_interface.GetUserAttr('user1', []) - - assert user_attrs.signature == 'sv' - assert user_attrs.variant_level == 0 - - # expect empty sequence; len(user_attrs) == 0 - assert not user_attrs - - # positive test - attributes = ['name', 'uidNumber', 'gidNumber', 'gecos', 'homeDirectory', - 'loginShell'] - expected = dict(name='user1', uidNumber='1001', gidNumber='2001', - gecos='1001', homeDirectory='/home/user1', - loginShell='/bin/bash') - user_attrs = sssd_interface.GetUserAttr('user1', attributes) - - assert user_attrs.signature == 'sv' - assert user_attrs.variant_level == 0 - - assert len(attributes) == len(user_attrs) - assert sorted(attributes) == sorted(user_attrs.keys()) - - # check values of attributes - for attr in user_attrs: - assert user_attrs[attr].signature == 's' - assert user_attrs[attr].variant_level == 1 - assert user_attrs[attr][0] == expected[attr] - - -def test_get_user_groups(dbus_system_bus, ldap_conn, sanity_rfc2307): - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe') - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - # negative test - with pytest.raises(dbus.exceptions.DBusException) as exc_info: - sssd_interface.GetUserGroups('non_existent_user') - assert exc_info.errisinstance(dbus.exceptions.DBusException) - - ex = exc_info.value - assert ex.get_dbus_name() == 'sbus.Error.NotFound' - assert ex.get_dbus_message() == 'No such file or directory' - - # the same test via nss responder - with pytest.raises(KeyError): - pwd.getpwnam("non_existent_user") - - # 0 groups - res = sssd_interface.GetUserGroups('user3') - assert res.signature == 's' - assert res.variant_level == 0 - - # expect empty sequence; len(res) == 0 - assert not res - - # single group - res = sssd_interface.GetUserGroups('user2') - assert res.signature == 's' - assert res.variant_level == 0 - - assert len(res) == 1 - assert res[0] == 'two_user_group' - - # more groups - res = sssd_interface.GetUserGroups('user1') - assert res.signature == 's' - assert res.variant_level == 0 - - assert len(res) == 2 - assert sorted(res) == ['single_user_group', 'two_user_group'] - - -''' -Given auto_private_groups is enabled -When GetUserGroups is called -Then the origPrimaryGroupGidNumber is returned as part of the group memberships -''' - - -def test_get_user_groups_given_auto_private_groups_enabled( - dbus_system_bus, - ldap_conn, auto_private_groups_rfc2307): - sssd_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe') - sssd_interface = dbus.Interface(sssd_obj, 'org.freedesktop.sssd.infopipe') - - res = sssd_interface.GetUserGroups('user1') - - assert sorted(res) == ['group1', 'single_user_group', 'two_user_group'] - - -def get_user_property(dbus_system_bus, username, prop_name): - users_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe/Users') - - users_iface = dbus.Interface(users_obj, - "org.freedesktop.sssd.infopipe.Users") - - user_path = users_iface.FindByName(username) - user_object = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - user_path) - - prop_iface = dbus.Interface(user_object, 'org.freedesktop.DBus.Properties') - return prop_iface.Get('org.freedesktop.sssd.infopipe.Users.User', - prop_name) - - -def get_user_by_attr(dbus_system_bus, attribute, filter): - users_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe/Users') - - users_iface = dbus.Interface(users_obj, - "org.freedesktop.sssd.infopipe.Users") - - return users_iface.ListByAttr(attribute, filter, 0) - - -def get_user_by_name(dbus_system_bus, filter): - users_obj = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe/Users') - - users_iface = dbus.Interface(users_obj, - "org.freedesktop.sssd.infopipe.Users") - - return users_iface.ListByName(filter, 0) - - -def test_get_extra_attributes_empty(dbus_system_bus, - ldap_conn, - sanity_rfc2307): - """ - Make sure the extraAttributes property can be retrieved - """ - extra_attrs = get_user_property(dbus_system_bus, - 'user1', - 'extraAttributes') - assert extra_attrs['extraName'][0] == 'user1' - - -def test_sssctl_domain_list_app_domain(dbus_system_bus, - ldap_conn, - sanity_rfc2307): - output = get_call_output(["sssctl", "domain-list"], subprocess.STDOUT) - - assert "Error" not in output - assert output.find("LDAP") != -1 - assert output.find("app") != -1 - - -def test_update_member_list_and_get_all(dbus_system_bus, - ldap_conn, - sanity_rfc2307): - ''' - Test that UpdateMemberList() and GetAll() return the correct users that are - members of a group - ''' - sssd_obj = dbus_system_bus.get_object( - 'org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe/Groups') - groups_iface = dbus.Interface(sssd_obj, - 'org.freedesktop.sssd.infopipe.Groups') - group_id = 2011 - expected_user_result = "/org/freedesktop/sssd/infopipe/Users/LDAP/1001" - - group_path = groups_iface.FindByName('single_user_group') - - group_object = dbus_system_bus.get_object('org.freedesktop.sssd.infopipe', - group_path) - group_iface = dbus.Interface(group_object, - 'org.freedesktop.sssd.infopipe.Groups.Group') - - # update local cache for group - try: - group_iface.UpdateMemberList(group_id) - except dbus.exceptions.DBusException as ex: - assert False, "Unexpected DBusException raised: " + ex - - # check members of group - prop_iface = dbus.Interface(group_object, - 'org.freedesktop.DBus.Properties') - res = prop_iface.GetAll('org.freedesktop.sssd.infopipe.Groups.Group') - assert str(res.get("users")[0]) == expected_user_result - - # delete group (there's no other way of removing a user from a group) and - # wait change to propagate - ldap_conn.delete("cn=single_user_group,ou=Groups,dc=example,dc=com") - time.sleep(INTERACTIVE_TIMEOUT) - - # add group back but this time without any member - ldap_group = ldap_ent.group("dc=example,dc=com", "single_user_group", 2011) - ldap_conn.add_s(ldap_group[0], ldap_group[1]) - - # invalidate cache - subprocess.call(["sss_cache", "-E"]) - - # check that group has no members - group_iface.UpdateMemberList(group_id) - prop_interface = dbus.Interface(group_object, - 'org.freedesktop.DBus.Properties') - res = prop_interface.GetAll('org.freedesktop.sssd.infopipe.Groups.Group') - assert not res.get("users") - - -def test_find_by_valid_certificate(dbus_system_bus, - ldap_conn, - add_user_with_cert): - """test_find_by_valid_certificate - - :id: 3f212e6e-00ce-44ac-95d4-59925cb5a14a - :title: SSSD-TC: Infopipe: Find by valid certificate - :casecomponent: sssd - :subsystemteam: sst_idm_sssd - """ - users_obj = dbus_system_bus.get_object( - 'org.freedesktop.sssd.infopipe', - '/org/freedesktop/sssd/infopipe/Users') - users_iface = dbus.Interface(users_obj, - 'org.freedesktop.sssd.infopipe.Users') - cert_path = os.path.dirname(config.PAM_CERT_DB_PATH) - - # Valid certificate with user - cert_file = cert_path + "/SSSD_test_cert_x509_0001.pem" - with open(cert_file, "r") as f: - cert = f.read() - res = users_iface.FindByValidCertificate(cert) - assert res == "/org/freedesktop/sssd/infopipe/Users/app/user1_40app" - - # Valid certificate without user - cert_file = cert_path + "/SSSD_test_cert_x509_0002.pem" - with open(cert_file, "r") as f: - cert = f.read() - try: - res = users_iface.FindByValidCertificate(cert) - assert False, "Previous call should raise an exception" - except dbus.exceptions.DBusException as ex: - assert str(ex) == "sbus.Error.NotFound: No such file or directory" - - # Valid certificate from another CA - cert_file = os.environ['ABS_SRCDIR'] + \ - "/../test_ECC_CA/SSSD_test_ECC_cert_key_0001.pem" - with open(cert_file, "r") as f: - cert = f.read() - try: - res = users_iface.FindByValidCertificate(cert) - assert False, "Previous call should raise an exception" - except dbus.exceptions.DBusException as ex: - assert str(ex) == \ - "org.freedesktop.DBus.Error.IOError: Input/output error" - - # Invalid certificate - cert = "Invalid cert" - try: - res = users_iface.FindByValidCertificate(cert) - assert False, "Previous call should raise an exception" - except dbus.exceptions.DBusException as ex: - error = "org.freedesktop.DBus.Error.IOError: Input/output error" - assert str(ex) == error - - # Remove certificate db - cert_db = cert_path + "/SSSD_test_CA.pem" - os.remove(cert_db) - cert_file = cert_path + "/SSSD_test_cert_x509_0002.pem" - with open(cert_file, "r") as f: - cert = f.read() - try: - res = users_iface.FindByValidCertificate(cert) - assert False, "Previous call should raise an exception" - except dbus.exceptions.DBusException as ex: - assert str(ex) == \ - "sbus.Error.NoCA: Certificate authority file not found" diff --git a/src/tests/intg/test_pysss_nss_idmap.py b/src/tests/intg/test_pysss_nss_idmap.py deleted file mode 100644 index c67c2bba00c..00000000000 --- a/src/tests/intg/test_pysss_nss_idmap.py +++ /dev/null @@ -1,361 +0,0 @@ -# -# LDAP integration test -# -# Copyright (c) 2017 Red Hat, Inc. -# Author: Lukas Slebodnik -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -import os -import stat -import pwd -import grp -import signal -import subprocess -import time -import pytest -import ldb -import pysss_nss_idmap - -import config -import ds_openldap - -from .util import unindent - -LDAP_BASE_DN = "dc=example,dc=com" - - -@pytest.fixture(scope="module") -def ad_inst(request): - """Fake AD server instance fixture""" - instance = ds_openldap.FakeAD( - config.PREFIX, 10389, LDAP_BASE_DN, - "cn=admin", "Secret123" - ) - - try: - instance.setup() - except Exception: - instance.teardown() - raise - request.addfinalizer(instance.teardown) - return instance - - -@pytest.fixture(scope="module") -def ldap_conn(request, ad_inst): - """LDAP server connection fixture""" - ldap_conn = ad_inst.bind() - ldap_conn.ad_inst = ad_inst - request.addfinalizer(ldap_conn.unbind_s) - return ldap_conn - - -def format_basic_conf(ldap_conn, ignore_unreadable_refs): - """Format a basic SSSD configuration""" - - ignore_unreadable_refs_conf = "false" - if ignore_unreadable_refs: - ignore_unreadable_refs_conf = "true" - - return unindent("""\ - [sssd] - domains = FakeAD - services = nss - - [nss] - - [pam] - - [domain/FakeAD] - ldap_search_base = {ldap_conn.ad_inst.base_dn} - ldap_referrals = false - - id_provider = ldap - auth_provider = ldap - chpass_provider = ldap - access_provider = ldap - - ldap_uri = {ldap_conn.ad_inst.ldap_url} - ldap_default_bind_dn = {ldap_conn.ad_inst.admin_dn} - ldap_default_authtok_type = password - ldap_default_authtok = {ldap_conn.ad_inst.admin_pw} - ldap_id_use_start_tls = false - - ldap_schema = ad - ldap_id_mapping = true - ldap_idmap_default_domain_sid = S-1-5-21-1305200397-2901131868-73388776 - case_sensitive = False - - ldap_ignore_unreadable_references = {ignore_unreadable_refs_conf} - """).format(**locals()) - - -def create_conf_file(contents): - """Create sssd.conf with specified contents""" - conf = open(config.CONF_PATH, "w") - conf.write(contents) - conf.close() - os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) - - -def create_conf_fixture(request, contents): - """ - Create sssd.conf with specified contents and add teardown for removing it - """ - create_conf_file(contents) - - def cleanup_conf_file(): - """Remove sssd.conf, if it exists""" - if os.path.lexists(config.CONF_PATH): - os.unlink(config.CONF_PATH) - - request.addfinalizer(cleanup_conf_file) - - -def create_sssd_process(): - """Start the SSSD process""" - if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: - raise Exception("sssd start failed") - - -def cleanup_sssd_process(): - """Stop the SSSD process and remove its state""" - try: - with open(config.PIDFILE_PATH, "r") as pid_file: - pid = int(pid_file.read()) - os.kill(pid, signal.SIGTERM) - while True: - try: - os.kill(pid, signal.SIGCONT) - except OSError: - break - time.sleep(1) - except OSError: - pass - for path in os.listdir(config.DB_PATH): - os.unlink(config.DB_PATH + "/" + path) - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - - -def create_sssd_fixture(request): - """Start SSSD and add teardown for stopping it and removing its state""" - create_sssd_process() - request.addfinalizer(cleanup_sssd_process) - - -def sysdb_sed_domainid(domain_name, domain_id): - sssd_cache = "{0}/cache_{1}.ldb".format(config.DB_PATH, domain_name) - domain_ldb = ldb.Ldb(sssd_cache) - - msg = ldb.Message() - msg.dn = ldb.Dn(domain_ldb, "cn=sysdb") - msg["cn"] = "sysdb" - msg["description"] = "base object" - msg["version"] = "0.17" - domain_ldb.add(msg) - - # Set domainID for fake AD domain - msg = ldb.Message() - msg.dn = ldb.Dn(domain_ldb, "cn={0},cn=sysdb".format(domain_name)) - msg["cn"] = domain_name - msg["domainID"] = domain_id - msg["distinguishedName"] = "cn={0},cn=sysdb".format(domain_name) - domain_ldb.add(msg) - - msg = ldb.Message() - msg.dn = ldb.Dn(domain_ldb, "@ATTRIBUTES") - msg["distinguishedName"] = "@ATTRIBUTES" - for attr in ['cn', 'dc', 'dn', 'objectclass', 'originalDN', - 'userPrincipalName']: - msg[attr] = "CASE_INSENSITIVE" - domain_ldb.add(msg) - - msg = ldb.Message() - msg.dn = ldb.Dn(domain_ldb, "@INDEXLIST") - msg["distinguishedName"] = "@INDEXLIST" - msg["@IDXONE"] = "1" - for attr in ['cn', 'objectclass', 'member', 'memberof', 'name', - 'uidNumber', 'gidNumber', 'lastUpdate', 'dataExpireTimestamp', - 'originalDN', 'nameAlias', 'servicePort', 'serviceProtocol', - 'sudoUser', 'sshKnownHostsExpire', 'objectSIDString']: - msg["@IDXATTR"] = attr - domain_ldb.add(msg) - - msg = ldb.Message() - msg.dn = ldb.Dn(domain_ldb, "@MODULES") - msg["distinguishedName"] = "@MODULES" - msg["@LIST"] = "asq,memberof" - domain_ldb.add(msg) - - -@pytest.fixture -def simple_ad(request, ldap_conn): - conf = format_basic_conf(ldap_conn, ignore_unreadable_refs=False) - sysdb_sed_domainid("FakeAD", "S-1-5-21-1305200397-2901131868-73388776") - - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_user_operations(ldap_conn, simple_ad): - user = 'user1_dom1-19661' - user_id = pwd.getpwnam(user).pw_uid - user_sid = 'S-1-5-21-1305200397-2901131868-73388776-82809' - - output = pysss_nss_idmap.getsidbyname(user)[user] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.SID_KEY] == user_sid - - output = pysss_nss_idmap.getsidbyusername(user)[user] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.SID_KEY] == user_sid - - output = pysss_nss_idmap.getsidbygroupname(user) - assert len(output) == 0 - - output = pysss_nss_idmap.getsidbyid(user_id)[user_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.SID_KEY] == user_sid - - output = pysss_nss_idmap.getsidbyuid(user_id)[user_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.SID_KEY] == user_sid - - output = pysss_nss_idmap.getsidbygid(user_id) - assert len(output) == 0 - - output = pysss_nss_idmap.getidbysid(user_sid)[user_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.ID_KEY] == user_id - - output = pysss_nss_idmap.getnamebysid(user_sid)[user_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_USER - assert output[pysss_nss_idmap.NAME_KEY] == user - - -def test_group_operations(ldap_conn, simple_ad): - group = 'group1_dom1-19661' - group_id = grp.getgrnam(group).gr_gid - group_sid = 'S-1-5-21-1305200397-2901131868-73388776-82810' - - output = pysss_nss_idmap.getsidbyname(group)[group] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbygroupname(group)[group] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyusername(group) - assert len(output) == 0 - - output = pysss_nss_idmap.getsidbyid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbygid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyuid(group_id) - assert len(output) == 0 - - output = pysss_nss_idmap.getidbysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.ID_KEY] == group_id - - output = pysss_nss_idmap.getnamebysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.NAME_KEY] == group - - -def test_case_insensitive(ldap_conn, simple_ad): - # resolve group and also member of this group - group = 'Domain Users' - group_id = grp.getgrnam(group).gr_gid - group_sid = 'S-1-5-21-1305200397-2901131868-73388776-513' - - output = pysss_nss_idmap.getsidbyname(group)[group] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbygid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyuid(group_id) - assert len(output) == 0 - - output = pysss_nss_idmap.getidbysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.ID_KEY] == group_id - - output = pysss_nss_idmap.getnamebysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.NAME_KEY] == group.lower() - - -@pytest.fixture -def simple_ad_ignore_unrdbl_refs(request, ldap_conn): - conf = format_basic_conf(ldap_conn, ignore_unreadable_refs=True) - sysdb_sed_domainid("FakeAD", "S-1-5-21-1305200397-2901131868-73388776") - - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_ignore_unreadable_references(ldap_conn, simple_ad_ignore_unrdbl_refs): - group = 'group3_dom1-17775' - group_id = grp.getgrnam(group).gr_gid - group_sid = 'S-1-5-21-1305200397-2901131868-73388776-82764' - - output = pysss_nss_idmap.getsidbyname(group)[group] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbygid(group_id)[group_id] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.SID_KEY] == group_sid - - output = pysss_nss_idmap.getsidbyuid(group_id) - assert len(output) == 0 - - output = pysss_nss_idmap.getidbysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.ID_KEY] == group_id - - output = pysss_nss_idmap.getnamebysid(group_sid)[group_sid] - assert output[pysss_nss_idmap.TYPE_KEY] == pysss_nss_idmap.ID_GROUP - assert output[pysss_nss_idmap.NAME_KEY] == group - - -def test_no_ignore_unreadable_references(ldap_conn, simple_ad): - group = 'group3_dom1-17775' - - # This group has a member attribute referencing to a user in other - # domain - with pytest.raises(KeyError): - grp.getgrnam(group) From 0a69e63263bf2eda13f95a74e62c8bc75d614c9f Mon Sep 17 00:00:00 2001 From: Sumit Bose Date: Wed, 8 Oct 2025 13:07:39 +0200 Subject: [PATCH 4/4] tests: add pysss_nss_idmap system test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The new system test is mostly a 1:1 replacement of the exisintg integration test. Reviewed-by: Dan Lavu Reviewed-by: Tomáš Halman (cherry picked from commit 41ff7cc054b029d5ad893057e142d88ce7f24c77) --- .../system/tests/test_pysss_nss_idmap.py | 345 ++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 src/tests/system/tests/test_pysss_nss_idmap.py diff --git a/src/tests/system/tests/test_pysss_nss_idmap.py b/src/tests/system/tests/test_pysss_nss_idmap.py new file mode 100644 index 00000000000..309ff27428a --- /dev/null +++ b/src/tests/system/tests/test_pysss_nss_idmap.py @@ -0,0 +1,345 @@ +""" +SSSD Python Client identity Lookups + +:requirement: Python binding of libsss_nss_idmap.so +""" + +from __future__ import annotations + +import ast + +import pytest +from sssd_test_framework.roles.ad import AD +from sssd_test_framework.roles.client import Client +from sssd_test_framework.roles.generic import GenericProvider +from sssd_test_framework.roles.ipa import IPA +from sssd_test_framework.roles.samba import Samba +from sssd_test_framework.topology import KnownTopology, KnownTopologyGroup + + +def get_sid_name(provider: GenericProvider): + if isinstance(provider, Samba): + return "objectSid" + elif isinstance(provider, AD): + return "SID" + elif isinstance(provider, IPA): + return "ipaNTSecurityIdentifier" + return None + + +def user_setup(provider: GenericProvider, client: Client): + sid_name = get_sid_name(provider) + assert sid_name is not None + + user = provider.user("user1").add() + user_sid = user.get([sid_name])[sid_name][0] + + client.sssd.restart() + + result = client.tools.getent.passwd(user.name) + assert result is not None, f"'{user.name}' missing" + assert result.name == user.name, f"'{user.name}' has wrong name {result.name}" + user_id = result.uid + + return (user, user_id, user_sid) + + +def group_setup(provider: GenericProvider, client: Client): + sid_name = get_sid_name(provider) + assert sid_name is not None + + group = provider.group("group1").add() + group_sid = group.get([sid_name])[sid_name][0] + + client.sssd.restart() + + result = client.tools.getent.group(group.name) + assert result is not None, f"'{group.name}' missing" + assert result.name == group.name, f"'{group.name}' has wrong name {result.name}" + group_id = result.gid + + return (group, group_id, group_sid) + + +def run_pysss_nss_idmap(client: Client, command: str, input): + if isinstance(input, str): + input = f'"{input}"' + + return client.host.conn.run(f"""python -c 'import pysss_nss_idmap; print(pysss_nss_idmap.{command}({input}))'""") + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_user_by_name(client: Client, provider: GenericProvider): + """ + :title: Lookup the SID of a user by name + :setup: + 1. Add user and start SSSD + :steps: + 1. Lookup SID by name and user name + 2. Lookup SID by group name + :expectedresults: + 1. User found + 2. User not found + :customerscenario: False + """ + + (user, user_id, user_sid) = user_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getsidbyname", user.name) + assert ast.literal_eval(output.stdout) == {user.name: {"sid": user_sid, "type": 1}} + + output = run_pysss_nss_idmap(client, "getsidbyusername", user.name) + assert ast.literal_eval(output.stdout) == {user.name: {"sid": user_sid, "type": 1}} + + output = run_pysss_nss_idmap(client, "getsidbygroupname", user.name) + assert ast.literal_eval(output.stdout) == {} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_user_by_id(client: Client, provider: GenericProvider): + """ + :title: Lookup the SID of a user by ID + :setup: + 1. Add user and start SSSD + :steps: + 1. Lookup SID by ID and UID + 2. Lookup SID by GID + :expectedresults: + 1. User found + 2. User not found + :customerscenario: False + """ + + (user, user_id, user_sid) = user_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getsidbyid", user_id) + assert ast.literal_eval(output.stdout) == {user_id: {"sid": user_sid, "type": 1}} + + output = run_pysss_nss_idmap(client, "getsidbyuid", user_id) + assert ast.literal_eval(output.stdout) == {user_id: {"sid": user_sid, "type": 1}} + + output = run_pysss_nss_idmap(client, "getsidbygid", user_id) + assert ast.literal_eval(output.stdout) == {} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_user_by_sid(client: Client, provider: GenericProvider): + """ + :title: Lookup the name and and ID of a user by SID + :setup: + 1. Add user and start SSSD + :steps: + 1. Lookup ID and name by SID + :expectedresults: + 1. User found + :customerscenario: False + """ + + (user, user_id, user_sid) = user_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getidbysid", user_sid) + assert ast.literal_eval(output.stdout) == {user_sid: {"id": user_id, "type": 1}} + + output = run_pysss_nss_idmap(client, "getnamebysid", user_sid) + assert ast.literal_eval(output.stdout) == {user_sid: {"name": user.name, "type": 1}} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_group_by_name(client: Client, provider: GenericProvider): + """ + :title: Lookup the SID of a group by name + :setup: + 1. Add group and start SSSD + :steps: + 1. Lookup SID by name and group name + 2. Lookup SID by user name + :expectedresults: + 1. Group found + 2. Group not found + :customerscenario: False + """ + + (group, group_id, group_sid) = group_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getsidbyname", group.name) + assert ast.literal_eval(output.stdout) == {group.name: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbygroupname", group.name) + assert ast.literal_eval(output.stdout) == {group.name: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbyusername", group.name) + assert ast.literal_eval(output.stdout) == {} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_group_by_id(client: Client, provider: GenericProvider): + """ + :title: Lookup the SID of a group by ID + :setup: + 1. Add group and start SSSD + :steps: + 1. Lookup SID by ID and GID + 2. Lookup SID by UID + :expectedresults: + 1. Group found + 2. Group not found + :customerscenario: False + """ + + (group, group_id, group_sid) = group_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getsidbyid", group_id) + assert ast.literal_eval(output.stdout) == {group_id: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbygid", group_id) + assert ast.literal_eval(output.stdout) == {group_id: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbyuid", group_id) + assert ast.literal_eval(output.stdout) == {} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopologyGroup.AnyDC) +def test_group_by_sid(client: Client, provider: GenericProvider): + """ + :title: Lookup the name and and ID of a user by SID + :setup: + 1. Add user and start SSSD + :steps: + 1. Lookup ID and name by SID + :expectedresults: + 1. Group found + :customerscenario: False + """ + + (group, group_id, group_sid) = group_setup(provider, client) + + output = run_pysss_nss_idmap(client, "getidbysid", group_sid) + assert ast.literal_eval(output.stdout) == {group_sid: {"id": group_id, "type": 2}} + + output = run_pysss_nss_idmap(client, "getnamebysid", group_sid) + assert ast.literal_eval(output.stdout) == {group_sid: {"name": group.name, "type": 2}} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopology.Samba) +@pytest.mark.topology(KnownTopology.AD) +def test_case_insensitive(client: Client, provider: GenericProvider): + """ + :title: Lookup group with upper and lower case in the name + :setup: + 1. Add 'Group1' with upper and lower case in the name and start SSSD + :steps: + 1. Lookup group SID by original and lower case name + 2. Lookup name by SID + :expectedresults: + 1. Group found + 2. Group found, result contains lower case name + :customerscenario: False + """ + + sid_name = get_sid_name(provider) + assert sid_name is not None + + group = provider.group("Group1").add() + group_sid = group.get([sid_name])[sid_name][0] + + client.sssd.restart() + + result = client.tools.getent.group(group.name) + assert result is not None, f"'{group.name}' missing" + assert result.name == group.name.lower(), f"'{group.name.lower()}' has wrong name {result.name}" + + output = run_pysss_nss_idmap(client, "getsidbyname", group.name) + assert ast.literal_eval(output.stdout) == {group.name: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbyname", group.name.lower()) + assert ast.literal_eval(output.stdout) == {group.name.lower(): {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getnamebysid", group_sid) + assert ast.literal_eval(output.stdout) == {group_sid: {"name": group.name.lower(), "type": 2}} + + +@pytest.mark.importance("critical") +@pytest.mark.topology(KnownTopology.AD) +def test_ignore_unreadable_references(client: Client, provider: GenericProvider): + """ + :title: Make sure unreadable references are ignored while reading nested groups + :setup: + 1. Add 'group1' and 'group2' to AD + 2. Make 'group2' a member of 'group1' + 3. Remove 'ReadProperty' for 'Domain Computers' from 'group2' so that + it will be inaccessible for SSSD + 4. Restart SSSD and check that 'group1' lookup fails + 5. Add 'ldap_ignore_unreadable_references = True' to SSSD configuration + and restart SSSD + 6. Determine group_sid and group_id for reference + :steps: + 1. Lookup group SID by name, ID, GID + 2. Lookup group SID by UID + 3. Lookup ID and name by group SID + :expectedresults: + 1. Group found + 2. Group not found + 3. Group found + :customerscenario: False + """ + + sid_name = get_sid_name(provider) + assert sid_name is not None + + group = provider.group("group1").add() + + group2 = provider.group("group2").add() + group.add_member(group2) + + provider.host.conn.run( + f""" + # Remove read access for 'Domain Computers' group from the group member + Import-Module ActiveDirectory + $my_group = Get-ADGroup("{group2.name}") + $domain_computers = New-Object System.Security.Principal.NTAccount("Domain Computers") + $acl = new-object System.DirectoryServices.ActiveDirectoryAccessRule($domain_computers, "ReadProperty", "Deny") + $path = "LDAP://" + $my_group.DistinguishedName + $adsi_group = [ADSI]"$path" + $adsi_group.psbase.get_objectSecurity().AddAccessRule($acl) + $adsi_group.psbase.CommitChanges() + """ + ) + client.sssd.restart() + + result = client.tools.getent.group(group.name) + assert result is None, f"'{group.name}' found unexpectedly" + + client.sssd.domain["ldap_ignore_unreadable_references"] = "True" + client.sssd.config_apply + client.sssd.restart() + + result = client.tools.getent.group(group.name) + assert result is not None, f"'{group.name}' missing" + assert result.name == group.name, f"'{group.name}' has wrong name {result.name}" + group_sid = group.get([sid_name])[sid_name][0] + group_id = result.gid + + output = run_pysss_nss_idmap(client, "getsidbyname", group.name) + assert ast.literal_eval(output.stdout) == {group.name: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbyid", group_id) + assert ast.literal_eval(output.stdout) == {group_id: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbygid", group_id) + assert ast.literal_eval(output.stdout) == {group_id: {"sid": group_sid, "type": 2}} + + output = run_pysss_nss_idmap(client, "getsidbyuid", group_id) + assert ast.literal_eval(output.stdout) == {} + + output = run_pysss_nss_idmap(client, "getidbysid", group_sid) + assert ast.literal_eval(output.stdout) == {group_sid: {"id": group_id, "type": 2}} + + output = run_pysss_nss_idmap(client, "getnamebysid", group_sid) + assert ast.literal_eval(output.stdout) == {group_sid: {"name": group.name, "type": 2}}