From 620d402a126580a13730f446dd99bf50814c9fb8 Mon Sep 17 00:00:00 2001 From: Petr Cech Date: Wed, 17 Aug 2016 13:58:30 +0200 Subject: [PATCH 07/39] INTG: Tests for ldap nested netgroups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds tests on reproducer of t2841. Resolves: https://fedorahosted.org/sssd/ticket/2841 Reviewed-by: Lukáš Slebodník (cherry picked from commit 05457ed0e399aaacc919b7aacee5d8210e1c1072) --- src/tests/intg/Makefile.am | 1 + src/tests/intg/test_netgroup.py | 459 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 460 insertions(+) create mode 100644 src/tests/intg/test_netgroup.py diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index d73e4216310ccd1c90e6b7eb0a0e60068fc45bd5..75422a4417046116bec11a8a680fe2248e3afb69 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -15,6 +15,7 @@ dist_noinst_DATA = \ test_ldap.py \ test_memory_cache.py \ test_ts_cache.py \ + test_netgroup.py \ $(NULL) config.py: config.py.m4 diff --git a/src/tests/intg/test_netgroup.py b/src/tests/intg/test_netgroup.py new file mode 100644 index 0000000000000000000000000000000000000000..b99476126844e35d5dbc1793077720b4020c2fb7 --- /dev/null +++ b/src/tests/intg/test_netgroup.py @@ -0,0 +1,459 @@ +# +# Netgroup integration test +# +# Copyright (c) 2016 Red Hat, Inc. +# Author: Petr Cech +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import os +import stat +import signal +import subprocess +import time +import ldap +import ldap.modlist +import pytest + +import config +import ds_openldap +import ldap_ent +from util import unindent +import sssd_netgroup + +LDAP_BASE_DN = "dc=example,dc=com" + + +@pytest.fixture(scope="module") +def ds_inst(request): + """LDAP server instance fixture""" + ds_inst = ds_openldap.DSOpenLDAP( + config.PREFIX, 10389, LDAP_BASE_DN, + "cn=admin", "Secret123" + ) + + try: + ds_inst.setup() + except: + ds_inst.teardown() + raise + request.addfinalizer(ds_inst.teardown) + return ds_inst + + +@pytest.fixture(scope="module") +def ldap_conn(request, ds_inst): + """LDAP server connection fixture""" + ldap_conn = ds_inst.bind() + ldap_conn.ds_inst = ds_inst + request.addfinalizer(ldap_conn.unbind_s) + return ldap_conn + + +def create_ldap_entries(ldap_conn, ent_list=None): + """Add LDAP entries from ent_list""" + if ent_list is not None: + for entry in ent_list: + ldap_conn.add_s(entry[0], entry[1]) + + +def cleanup_ldap_entries(ldap_conn, ent_list=None): + """Remove LDAP entries added by create_ldap_entries""" + if ent_list is None: + for ou in ("Users", "Groups", "Netgroups", "Services", "Policies"): + for entry in ldap_conn.search_s("ou=" + ou + "," + + ldap_conn.ds_inst.base_dn, + ldap.SCOPE_ONELEVEL, + attrlist=[]): + ldap_conn.delete_s(entry[0]) + else: + for entry in ent_list: + ldap_conn.delete_s(entry[0]) + + +def create_ldap_cleanup(request, ldap_conn, ent_list=None): + """Add teardown for removing all user/group LDAP entries""" + request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) + + +def create_ldap_fixture(request, ldap_conn, ent_list=None): + """Add LDAP entries and add teardown for removing them""" + create_ldap_entries(ldap_conn, ent_list) + create_ldap_cleanup(request, ldap_conn, ent_list) + + +SCHEMA_RFC2307_BIS = "rfc2307bis" + + +def format_basic_conf(ldap_conn, schema): + """Format a basic SSSD configuration""" + schema_conf = "ldap_schema = " + schema + "\n" + schema_conf += "ldap_group_object_class = groupOfNames\n" + return unindent("""\ + [sssd] + domains = LDAP + services = nss + + [domain/LDAP] + {schema_conf} + id_provider = ldap + auth_provider = ldap + ldap_uri = {ldap_conn.ds_inst.ldap_url} + ldap_search_base = {ldap_conn.ds_inst.base_dn} + ldap_netgroup_search_base = ou=Netgroups,{ldap_conn.ds_inst.base_dn} + """).format(**locals()) + + +def create_conf_file(contents): + """Create sssd.conf with specified contents""" + conf = open(config.CONF_PATH, "w") + conf.write(contents) + conf.close() + os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) + + +def cleanup_conf_file(): + """Remove sssd.conf, if it exists""" + if os.path.lexists(config.CONF_PATH): + os.unlink(config.CONF_PATH) + + +def create_conf_cleanup(request): + """Add teardown for removing sssd.conf""" + request.addfinalizer(cleanup_conf_file) + + +def create_conf_fixture(request, contents): + """ + Create sssd.conf with specified contents and add teardown for removing it + """ + create_conf_file(contents) + create_conf_cleanup(request) + + +def create_sssd_process(): + """Start the SSSD process""" + if subprocess.call(["sssd", "-D", "-f"]) != 0: + raise Exception("sssd start failed") + + +def cleanup_sssd_process(): + """Stop the SSSD process and remove its state""" + try: + pid_file = open(config.PIDFILE_PATH, "r") + pid = int(pid_file.read()) + os.kill(pid, signal.SIGTERM) + while True: + try: + os.kill(pid, signal.SIGCONT) + except: + break + time.sleep(1) + except: + pass + for path in os.listdir(config.DB_PATH): + os.unlink(config.DB_PATH + "/" + path) + for path in os.listdir(config.MCACHE_PATH): + os.unlink(config.MCACHE_PATH + "/" + path) + + +def create_sssd_cleanup(request): + """Add teardown for stopping SSSD and removing its state""" + request.addfinalizer(cleanup_sssd_process) + + +def create_sssd_fixture(request): + """Start SSSD and add teardown for stopping it and removing its state""" + create_sssd_process() + create_sssd_cleanup(request) + + +@pytest.fixture +def add_empty_netgroup(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_netgroup("empty_netgroup") + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_add_empty_netgroup(add_empty_netgroup): + """ + Adding empty netgroup. + """ + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("empty_netgroup") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [] + + +@pytest.fixture +def add_tripled_netgroup(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_netgroup("tripled_netgroup", ["(host,user,domain)"]) + + ent_list.add_netgroup("adv_tripled_netgroup", ["(host1,user1,domain1)", + "(host2,user2,domain2)"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_add_tripled_netgroup(add_tripled_netgroup): + """ + Adding netgroup with triplet. + """ + + res, _, netgrps = sssd_netgroup.get_sssd_netgroups("tripled_netgroup") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgrps == [("host", "user", "domain")] + + res, _, netgrps = sssd_netgroup.get_sssd_netgroups("adv_tripled_netgroup") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgrps) == sorted([("host1", "user1", "domain1"), + ("host2", "user2", "domain2")]) + + +@pytest.fixture +def add_mixed_netgroup(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_netgroup("mixed_netgroup1") + ent_list.add_netgroup("mixed_netgroup2", members=["mixed_netgroup1"]) + + ent_list.add_netgroup("mixed_netgroup3", ["(host1,user1,domain1)"]) + ent_list.add_netgroup("mixed_netgroup4", + ["(host2,user2,domain2)", "(host3,user3,domain3)"]) + + ent_list.add_netgroup("mixed_netgroup5", + ["(host4,user4,domain4)"], + ["mixed_netgroup1"]) + ent_list.add_netgroup("mixed_netgroup6", + ["(host5,user5,domain5)"], + ["mixed_netgroup2"]) + + ent_list.add_netgroup("mixed_netgroup7", members=["mixed_netgroup3"]) + ent_list.add_netgroup("mixed_netgroup8", + members=["mixed_netgroup3", "mixed_netgroup4"]) + + ent_list.add_netgroup("mixed_netgroup9", + ["(host6,user6,domain6)"], + ["mixed_netgroup3", "mixed_netgroup4"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_add_mixed_netgroup(add_mixed_netgroup): + """ + Adding many netgroups of different type. + """ + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup1") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup3") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [("host1", "user1", "domain1")] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup4") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgroups) == sorted([("host2", "user2", "domain2"), + ("host3", "user3", "domain3")]) + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup5") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [("host4", "user4", "domain4")] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup6") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [("host5", "user5", "domain5")] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup7") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [("host1", "user1", "domain1")] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup8") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgroups) == sorted([("host1", "user1", "domain1"), + ("host2", "user2", "domain2"), + ("host3", "user3", "domain3")]) + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("mixed_netgroup9") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgroups) == sorted([("host1", "user1", "domain1"), + ("host2", "user2", "domain2"), + ("host3", "user3", "domain3"), + ("host6", "user6", "domain6")]) + + +@pytest.fixture +def remove_step_by_step(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_netgroup("rm_empty_netgroup1", ["(host1,user1,domain1)"]) + ent_list.add_netgroup("rm_empty_netgroup2", + ["(host2,user2,domain2)"], + ["rm_empty_netgroup1"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return ent_list + + +def test_remove_step_by_step(remove_step_by_step, ldap_conn): + """ + Removing netgroups step by step. + """ + + ent_list = remove_step_by_step + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup1") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host1', 'user1', 'domain1')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgroups) == sorted([('host1', 'user1', 'domain1'), + ('host2', 'user2', 'domain2')]) + + # removing of rm_empty_netgroup1 + ldap_conn.delete_s(ent_list[0][0]) + ent_list.remove(ent_list[0]) + + if subprocess.call(["sss_cache", "-N"]) != 0: + raise Exception("sssd_cache failed") + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup1") + assert res == sssd_netgroup.NssReturnCode.NOTFOUND + assert netgroups == [] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host2', 'user2', 'domain2')] + + # removing of rm_empty_netgroup2 + ldap_conn.delete_s(ent_list[0][0]) + ent_list.remove(ent_list[0]) + + if subprocess.call(["sss_cache", "-N"]) != 0: + raise Exception("sssd_cache failed") + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup1") + assert res == sssd_netgroup.NssReturnCode.NOTFOUND + assert netgroups == [] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("rm_empty_netgroup2") + assert res == sssd_netgroup.NssReturnCode.NOTFOUND + assert netgroups == [] + + +@pytest.fixture +def removing_nested_netgroups(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_netgroup("t2841_netgroup1", ["(host1,user1,domain1)"]) + ent_list.add_netgroup("t2841_netgroup2", ["(host2,user2,domain2)"]) + ent_list.add_netgroup("t2841_netgroup3", + members=["t2841_netgroup1", "t2841_netgroup2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_removing_nested_netgroups(removing_nested_netgroups, ldap_conn): + """ + Regression test for ticket 2841. + https://fedorahosted.org/sssd/ticket/2841 + """ + + netgrp_dn = 'cn=t2841_netgroup3,ou=Netgroups,' + ldap_conn.ds_inst.base_dn + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup1") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host1', 'user1', 'domain1')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host2', 'user2', 'domain2')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup3") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert sorted(netgroups) == sorted([('host1', 'user1', 'domain1'), + ('host2', 'user2', 'domain2')]) + + # removing of t2841_netgroup1 from t2841_netgroup3 + old = {'memberNisNetgroup': ["t2841_netgroup1", "t2841_netgroup2"]} + new = {'memberNisNetgroup': ["t2841_netgroup2"]} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(netgrp_dn, ldif) + + if subprocess.call(["sss_cache", "-N"]) != 0: + raise Exception("sssd_cache failed") + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup1") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host1', 'user1', 'domain1')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host2', 'user2', 'domain2')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup3") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host2', 'user2', 'domain2')] + + # removing of t2841_netgroup2 from t2841_netgroup3 + old = {'memberNisNetgroup': ["t2841_netgroup2"]} + new = {'memberNisNetgroup': []} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(netgrp_dn, ldif) + + if subprocess.call(["sss_cache", "-N"]) != 0: + raise Exception("sssd_cache failed") + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup1") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host1', 'user1', 'domain1')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup2") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [('host2', 'user2', 'domain2')] + + res, _, netgroups = sssd_netgroup.get_sssd_netgroups("t2841_netgroup3") + assert res == sssd_netgroup.NssReturnCode.SUCCESS + assert netgroups == [] -- 2.9.3