From b294f36d3b5098cb88758221910afdcb245f14db Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 29 Oct 2020 00:23:53 +0900 Subject: [PATCH 01/16] add rhel support --- cloudinit/config/cc_ca_certs.py | 113 ++++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 34 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 3c453d91ca9..579a3879b0c 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -44,48 +44,92 @@ from cloudinit import subp from cloudinit import util -CA_CERT_PATH = "/usr/share/ca-certificates/" -CA_CERT_FILENAME = "cloud-init-ca-certs.crt" -CA_CERT_CONFIG = "/etc/ca-certificates.conf" -CA_CERT_SYSTEM_PATH = "/etc/ssl/certs/" -CA_CERT_FULL_PATH = os.path.join(CA_CERT_PATH, CA_CERT_FILENAME) - -distros = ['alpine', 'debian', 'ubuntu'] - - -def update_ca_certs(): +DISTRO_DEFAULT_CONFIG = { + 'ubuntu': { + 'ca_cert_path': '/usr/share/ca-certificates/', + 'ca_cert_filename': 'cloud-init-ca-certs.crt', + 'ca_cert_config': '/etc/ca-certificates.conf', + 'ca_cert_system_path': '/etc/ssl/certs/', + 'ca_cert_update_exe': 'update-ca-certificates' + }, + 'debian': { + 'ca_cert_path': '/usr/share/ca-certificates/', + 'ca_cert_filename': 'cloud-init-ca-certs.crt', + 'ca_cert_config': '/etc/ca-certificates.conf', + 'ca_cert_system_path': '/etc/ssl/certs/', + 'ca_cert_update_exe': 'update-ca-certificates' + }, + 'alpine': { + 'ca_cert_path': '/usr/share/ca-certificates/', + 'ca_cert_filename': 'cloud-init-ca-certs.crt', + 'ca_cert_config': '/etc/ca-certificates.conf', + 'ca_cert_system_path': '/etc/ssl/certs/', + 'ca_cert_update_exe': 'update-ca-certificates' + }, + 'rhel': { + 'ca_cert_path': '/usr/share/pki/ca-trust-source/', + 'ca_cert_filename': 'anchors/cloud-init-ca-certs.crt', + 'ca_cert_config': '', + 'ca_cert_system_path': '/etc/pki/ca-trust/', + 'ca_cert_update_exe': 'update-ca-trust' + } +} + + +distros = ['alpine', 'debian', 'ubuntu', 'rhel'] + + +def _distro_ca_certs_configs(distro): + """Construct a distro-specific ca_certs config dictionary by merging + distro specific changes into base config. + + @param distro: String providing the distro class name. + @returns: Dict of distro configurations for ntp clients. + """ + dcfg = DISTRO_DEFAULT_CONFIG + if distro in dcfg: + cfg = DISTRO_DEFAULT_CONFIG[distro] + cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], cfg['ca_cert_filename']) + else: + cfg = {} + return cfg + +def update_ca_certs(distro_name): """ Updates the CA certificate cache on the current machine. """ - subp.subp(["update-ca-certificates"], capture=False) + distro_cfg = _distro_ca_certs_configs(distro_name) + subp.subp([distro_cfg['ca_cert_update_exe']], capture=False) -def add_ca_certs(certs): +def add_ca_certs(distro_name, certs): """ Adds certificates to the system. To actually apply the new certificates you must also call L{update_ca_certs}. @param certs: A list of certificate strings. """ + distro_cfg = _distro_ca_certs_configs(distro_name) if certs: # First ensure they are strings... cert_file_contents = "\n".join([str(c) for c in certs]) - util.write_file(CA_CERT_FULL_PATH, cert_file_contents, mode=0o644) - - if os.stat(CA_CERT_CONFIG).st_size == 0: - # If the CA_CERT_CONFIG file is empty (i.e. all existing - # CA certs have been deleted) then simply output a single - # line with the cloud-init cert filename. - out = "%s\n" % CA_CERT_FILENAME - else: - # Append cert filename to CA_CERT_CONFIG file. - # We have to strip the content because blank lines in the file - # causes subsequent entries to be ignored. (LP: #1077020) - orig = util.load_file(CA_CERT_CONFIG) - cur_cont = '\n'.join([line for line in orig.splitlines() - if line != CA_CERT_FILENAME]) - out = "%s\n%s\n" % (cur_cont.rstrip(), CA_CERT_FILENAME) - util.write_file(CA_CERT_CONFIG, out, omode="wb") + util.write_file(distro_cfg['ca_cert_full_path'], cert_file_contents, mode=0o644) + + if distro_cfg['ca_cert_config'] != '': + if os.stat(distro_cfg['ca_cert_config']).st_size == 0: + # If the CA_CERT_CONFIG file is empty (i.e. all existing + # CA certs have been deleted) then simply output a single + # line with the cloud-init cert filename. + out = "%s\n" % distro_cfg['ca_cert_filename'] + else: + # Append cert filename to CA_CERT_CONFIG file. + # We have to strip the content because blank lines in the file + # causes subsequent entries to be ignored. (LP: #1077020) + orig = util.load_file(distro_cfg['ca_cert_config']) + cur_cont = '\n'.join([line for line in orig.splitlines() + if line != distro_cfg['ca_cert_filename']]) + out = "%s\n%s\n" % (cur_cont.rstrip(), distro_cfg['ca_cert_filename']) + util.write_file(distro_cfg['ca_cert_config'], out, omode="wb") def remove_default_ca_certs(distro_name): @@ -93,11 +137,12 @@ def remove_default_ca_certs(distro_name): Removes all default trusted CA certificates from the system. To actually apply the change you must also call L{update_ca_certs}. """ - util.delete_dir_contents(CA_CERT_PATH) - util.delete_dir_contents(CA_CERT_SYSTEM_PATH) - util.write_file(CA_CERT_CONFIG, "", mode=0o644) + distro_cfg = _distro_ca_certs_configs(distro_name) + util.delete_dir_contents(distro_cfg['ca_cert_path']) + util.delete_dir_contents(distro_cfg['ca_cert_system_path']) + util.write_file(distro_cfg['ca_cert_config'], "", mode=0o644) - if distro_name != 'alpine': + if distro_name == 'debian' or distro_name == 'ubuntu': debconf_sel = ( "ca-certificates ca-certificates/trust_new_crts " + "select no") subp.subp(('debconf-set-selections', '-'), debconf_sel) @@ -132,10 +177,10 @@ def handle(name, cfg, cloud, log, _args): trusted_certs = util.get_cfg_option_list(ca_cert_cfg, "trusted") if trusted_certs: log.debug("Adding %d certificates" % len(trusted_certs)) - add_ca_certs(trusted_certs) + add_ca_certs(cloud.distro.name, trusted_certs) # Update the system with the new cert configuration. log.debug("Updating certificates") - update_ca_certs() + update_ca_certs(cloud.distro.name) # vi: ts=4 expandtab From dcd2165a4cb1d0d2b77de8cf7865078cc37402f4 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 29 Oct 2020 00:25:20 +0900 Subject: [PATCH 02/16] add rhel test case --- .../test_handler/test_handler_ca_certs.py | 44 +++++++++++++++---- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index e74a0a08b32..c6b739ef5b6 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -98,7 +98,7 @@ def test_single_trusted(self): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1']) + self.mock_add.assert_called_once_with('ubuntu', ['CERT1']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) @@ -108,7 +108,7 @@ def test_multiple_trusted(self): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1', 'CERT2']) + self.mock_add.assert_called_once_with('ubuntu', ['CERT1', 'CERT2']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) @@ -138,7 +138,7 @@ def test_correct_order_for_remove_then_add(self): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1']) + self.mock_add.assert_called_once_with('ubuntu', ['CERT1']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 1) @@ -153,10 +153,18 @@ def setUp(self): 'cloud_dir': tmpdir, }) + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) + def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs([]) + cc_ca_certs.add_ca_certs('ubuntu', []) + self.assertEqual(mockobj.call_count, 0) + with mock.patch.object(util, 'write_file') as mockobj: + cc_ca_certs.add_ca_certs('rhel', []) self.assertEqual(mockobj.call_count, 0) def test_single_cert_trailing_cr(self): @@ -174,7 +182,7 @@ def test_single_cert_trailing_cr(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs([cert]) + cc_ca_certs.add_ca_certs('ubuntu', [cert]) mock_write.assert_has_calls([ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", @@ -196,7 +204,7 @@ def test_single_cert_no_trailing_cr(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs([cert]) + cc_ca_certs.add_ca_certs('ubuntu', [cert]) mock_write.assert_has_calls([ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", @@ -223,7 +231,7 @@ def test_single_cert_to_empty_existing_ca_file(self): ) mock_stat.return_value.st_size = 0 - cc_ca_certs.add_ca_certs([cert]) + cc_ca_certs.add_ca_certs('ubuntu', [cert]) mock_write.assert_has_calls([ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", @@ -243,7 +251,7 @@ def test_multiple_certs(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs(certs) + cc_ca_certs.add_ca_certs('ubuntu', certs) mock_write.assert_has_calls([ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", @@ -259,10 +267,15 @@ def test_multiple_certs(self): class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): with mock.patch.object(subp, 'subp') as mockobj: - cc_ca_certs.update_ca_certs() + cc_ca_certs.update_ca_certs('ubuntu') mockobj.assert_called_once_with( ["update-ca-certificates"], capture=False) + with mock.patch.object(subp, 'subp') as mockobj: + cc_ca_certs.update_ca_certs('rhel') + mockobj.assert_called_once_with( + ["update-ca-trust"], capture=False) + class TestRemoveDefaultCaCerts(TestCase): @@ -295,4 +308,17 @@ def test_commands(self): ('debconf-set-selections', '-'), "ca-certificates ca-certificates/trust_new_crts select no") + with ExitStack() as mocks: + mock_delete = mocks.enter_context( + mock.patch.object(util, 'delete_dir_contents')) + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp')) + + cc_ca_certs.remove_default_ca_certs('rhel') + + mock_delete.assert_has_calls([ + mock.call("/usr/share/pki/ca-trust-source/"), + mock.call("/etc/pki/ca-trust/")]) + # vi: ts=4 expandtab From bb55661cd67e779c3bf32d10d20db67ef3fb221a Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 29 Oct 2020 00:48:29 +0900 Subject: [PATCH 03/16] Update cc_ca_certs.py --- cloudinit/config/cc_ca_certs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 579a3879b0c..dd6289d5c52 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -25,7 +25,7 @@ **Module frequency:** per instance -**Supported distros:** alpine, debian, ubuntu +**Supported distros:** alpine, debian, ubuntu, rhel **Config keys**:: From 8ac34e63af9f5138e26b9255c2372a694be48504 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 29 Oct 2020 08:48:35 +0900 Subject: [PATCH 04/16] add cawamata --- tools/.github-cla-signers | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/.github-cla-signers b/tools/.github-cla-signers index b7e9dc02a5d..563a2d40dc5 100644 --- a/tools/.github-cla-signers +++ b/tools/.github-cla-signers @@ -4,6 +4,7 @@ beezly bipinbachhao BirknerAlex candlerb +cawamata dermotbradley dhensby eandersson From 298476411214ac86398828a00b2745119a0ba595 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 29 Oct 2020 09:37:33 +0900 Subject: [PATCH 05/16] modify from flake8 test result --- cloudinit/config/cc_ca_certs.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index dd6289d5c52..27ecbd5d64e 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -25,7 +25,7 @@ **Module frequency:** per instance -**Supported distros:** alpine, debian, ubuntu, rhel +**Supported distros:** alpine, debian, ubuntu **Config keys**:: @@ -89,11 +89,13 @@ def _distro_ca_certs_configs(distro): dcfg = DISTRO_DEFAULT_CONFIG if distro in dcfg: cfg = DISTRO_DEFAULT_CONFIG[distro] - cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], cfg['ca_cert_filename']) + cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], + cfg['ca_cert_filename']) else: cfg = {} return cfg + def update_ca_certs(distro_name): """ Updates the CA certificate cache on the current machine. @@ -113,7 +115,9 @@ def add_ca_certs(distro_name, certs): if certs: # First ensure they are strings... cert_file_contents = "\n".join([str(c) for c in certs]) - util.write_file(distro_cfg['ca_cert_full_path'], cert_file_contents, mode=0o644) + util.write_file(distro_cfg['ca_cert_full_path'], + cert_file_contents, + mode=0o644) if distro_cfg['ca_cert_config'] != '': if os.stat(distro_cfg['ca_cert_config']).st_size == 0: @@ -126,9 +130,10 @@ def add_ca_certs(distro_name, certs): # We have to strip the content because blank lines in the file # causes subsequent entries to be ignored. (LP: #1077020) orig = util.load_file(distro_cfg['ca_cert_config']) - cur_cont = '\n'.join([line for line in orig.splitlines() - if line != distro_cfg['ca_cert_filename']]) - out = "%s\n%s\n" % (cur_cont.rstrip(), distro_cfg['ca_cert_filename']) + cr_cont = '\n'.join([line for line in orig.splitlines() + if line != distro_cfg['ca_cert_filename']]) + out = "%s\n%s\n" % (cr_cont.rstrip(), + distro_cfg['ca_cert_filename']) util.write_file(distro_cfg['ca_cert_config'], out, omode="wb") From 4041e8ff1390fa8fc8a49aa87a5b6f7e4a0157d1 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Tue, 24 Nov 2020 23:10:47 +0900 Subject: [PATCH 06/16] Update cc_ca_certs.py reflect PR review comments --- cloudinit/config/cc_ca_certs.py | 113 ++++++++++++++++---------------- 1 file changed, 56 insertions(+), 57 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 27ecbd5d64e..4b34633879a 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -25,7 +25,7 @@ **Module frequency:** per instance -**Supported distros:** alpine, debian, ubuntu +**Supported distros:** alpine, debian, ubuntu, rhel **Config keys**:: @@ -44,32 +44,18 @@ from cloudinit import subp from cloudinit import util -DISTRO_DEFAULT_CONFIG = { - 'ubuntu': { - 'ca_cert_path': '/usr/share/ca-certificates/', - 'ca_cert_filename': 'cloud-init-ca-certs.crt', - 'ca_cert_config': '/etc/ca-certificates.conf', - 'ca_cert_system_path': '/etc/ssl/certs/', - 'ca_cert_update_exe': 'update-ca-certificates' - }, - 'debian': { - 'ca_cert_path': '/usr/share/ca-certificates/', - 'ca_cert_filename': 'cloud-init-ca-certs.crt', - 'ca_cert_config': '/etc/ca-certificates.conf', - 'ca_cert_system_path': '/etc/ssl/certs/', - 'ca_cert_update_exe': 'update-ca-certificates' - }, - 'alpine': { - 'ca_cert_path': '/usr/share/ca-certificates/', - 'ca_cert_filename': 'cloud-init-ca-certs.crt', - 'ca_cert_config': '/etc/ca-certificates.conf', - 'ca_cert_system_path': '/etc/ssl/certs/', - 'ca_cert_update_exe': 'update-ca-certificates' - }, +DEFAULT_CONFIG = { + 'ca_cert_path': '/usr/share/ca-certificates/', + 'ca_cert_filename': 'cloud-init-ca-certs.crt', + 'ca_cert_config': '/etc/ca-certificates.conf', + 'ca_cert_system_path': '/etc/ssl/certs/', + 'ca_cert_update_exe': 'update-ca-certificates' +} +DISTRO_OVERRIDES = { 'rhel': { 'ca_cert_path': '/usr/share/pki/ca-trust-source/', 'ca_cert_filename': 'anchors/cloud-init-ca-certs.crt', - 'ca_cert_config': '', + 'ca_cert_config': None, 'ca_cert_system_path': '/etc/pki/ca-trust/', 'ca_cert_update_exe': 'update-ca-trust' } @@ -80,25 +66,26 @@ def _distro_ca_certs_configs(distro): - """Construct a distro-specific ca_certs config dictionary by merging - distro specific changes into base config. + """Return a distro-specific ca_certs config dictionary @param distro: String providing the distro class name. - @returns: Dict of distro configurations for ntp clients. + @returns: Dict of distro configurations for ca-cert. """ - dcfg = DISTRO_DEFAULT_CONFIG + cfg = DEFAULT_CONFIG + dcfg = DISTRO_OVERRIDES if distro in dcfg: - cfg = DISTRO_DEFAULT_CONFIG[distro] - cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], - cfg['ca_cert_filename']) - else: - cfg = {} + cfg = dcfg[distro] + + cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], + cfg['ca_cert_filename']) return cfg def update_ca_certs(distro_name): """ Updates the CA certificate cache on the current machine. + + @param distro: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) subp.subp([distro_cfg['ca_cert_update_exe']], capture=False) @@ -109,32 +96,44 @@ def add_ca_certs(distro_name, certs): Adds certificates to the system. To actually apply the new certificates you must also call L{update_ca_certs}. + @param distro: String providing the distro class name. @param certs: A list of certificate strings. """ distro_cfg = _distro_ca_certs_configs(distro_name) - if certs: - # First ensure they are strings... - cert_file_contents = "\n".join([str(c) for c in certs]) - util.write_file(distro_cfg['ca_cert_full_path'], - cert_file_contents, - mode=0o644) - - if distro_cfg['ca_cert_config'] != '': - if os.stat(distro_cfg['ca_cert_config']).st_size == 0: - # If the CA_CERT_CONFIG file is empty (i.e. all existing - # CA certs have been deleted) then simply output a single - # line with the cloud-init cert filename. - out = "%s\n" % distro_cfg['ca_cert_filename'] - else: - # Append cert filename to CA_CERT_CONFIG file. - # We have to strip the content because blank lines in the file - # causes subsequent entries to be ignored. (LP: #1077020) - orig = util.load_file(distro_cfg['ca_cert_config']) - cr_cont = '\n'.join([line for line in orig.splitlines() - if line != distro_cfg['ca_cert_filename']]) - out = "%s\n%s\n" % (cr_cont.rstrip(), - distro_cfg['ca_cert_filename']) - util.write_file(distro_cfg['ca_cert_config'], out, omode="wb") + if not certs: + return + # First ensure they are strings... + cert_file_contents = "\n".join([str(c) for c in certs]) + util.write_file(distro_cfg['ca_cert_full_path'], + cert_file_contents, + mode=0o644) + update_cert_config(distro_name) + + +def update_cert_config(distro_name): + """ + Update Certificate config file to add the file path managed cloud-init + + @param distro: String providing the distro class name. + """ + distro_cfg = _distro_ca_certs_configs(distro_name) + if distro_cfg['ca_cert_config'] == '': + return + if os.stat(distro_cfg['ca_cert_config']).st_size == 0: + # If the CA_CERT_CONFIG file is empty (i.e. all existing + # CA certs have been deleted) then simply output a single + # line with the cloud-init cert filename. + out = "%s\n" % distro_cfg['ca_cert_filename'] + else: + # Append cert filename to CA_CERT_CONFIG file. + # We have to strip the content because blank lines in the file + # causes subsequent entries to be ignored. (LP: #1077020) + orig = util.load_file(distro_cfg['ca_cert_config']) + cr_cont = '\n'.join([line for line in orig.splitlines() + if line != distro_cfg['ca_cert_filename']]) + out = "%s\n%s\n" % (cr_cont.rstrip(), + distro_cfg['ca_cert_filename']) + util.write_file(distro_cfg['ca_cert_config'], out, omode="wb") def remove_default_ca_certs(distro_name): @@ -147,7 +146,7 @@ def remove_default_ca_certs(distro_name): util.delete_dir_contents(distro_cfg['ca_cert_system_path']) util.write_file(distro_cfg['ca_cert_config'], "", mode=0o644) - if distro_name == 'debian' or distro_name == 'ubuntu': + if distro_name in ['debian', 'ubuntu']: debconf_sel = ( "ca-certificates ca-certificates/trust_new_crts " + "select no") subp.subp(('debconf-set-selections', '-'), debconf_sel) From f11b783f97ab7b58b4ff88acf52a3abea7c1a654 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 26 Nov 2020 06:57:17 +0900 Subject: [PATCH 07/16] Update cloudinit/config/cc_ca_certs.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mina Galić --- cloudinit/config/cc_ca_certs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 4b34633879a..160f15c03f4 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -118,6 +118,7 @@ def update_cert_config(distro_name): """ distro_cfg = _distro_ca_certs_configs(distro_name) if distro_cfg['ca_cert_config'] == '': + if distro_cfg['ca_cert_config'] is None: return if os.stat(distro_cfg['ca_cert_config']).st_size == 0: # If the CA_CERT_CONFIG file is empty (i.e. all existing From aa42fe0f36620bfe2e1f2fb6b9ef9c29eb522089 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Mon, 7 Dec 2020 22:44:15 +0900 Subject: [PATCH 08/16] Update cc_ca_certs.py --- cloudinit/config/cc_ca_certs.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 160f15c03f4..53fa8a02a0d 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -49,7 +49,7 @@ 'ca_cert_filename': 'cloud-init-ca-certs.crt', 'ca_cert_config': '/etc/ca-certificates.conf', 'ca_cert_system_path': '/etc/ssl/certs/', - 'ca_cert_update_exe': 'update-ca-certificates' + 'ca_cert_update_cmd': ['update-ca-certificates'] } DISTRO_OVERRIDES = { 'rhel': { @@ -57,7 +57,7 @@ 'ca_cert_filename': 'anchors/cloud-init-ca-certs.crt', 'ca_cert_config': None, 'ca_cert_system_path': '/etc/pki/ca-trust/', - 'ca_cert_update_exe': 'update-ca-trust' + 'ca_cert_update_cmd': ['update-ca-trust'] } } @@ -88,7 +88,7 @@ def update_ca_certs(distro_name): @param distro: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) - subp.subp([distro_cfg['ca_cert_update_exe']], capture=False) + subp.subp(distro_cfg['ca_cert_update_cmd'], capture=False) def add_ca_certs(distro_name, certs): @@ -117,7 +117,6 @@ def update_cert_config(distro_name): @param distro: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) - if distro_cfg['ca_cert_config'] == '': if distro_cfg['ca_cert_config'] is None: return if os.stat(distro_cfg['ca_cert_config']).st_size == 0: From c9ca5927eabfb67391c5a9aeb4cc11296678e2a9 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Mon, 7 Dec 2020 22:50:55 +0900 Subject: [PATCH 09/16] Update test_handler_ca_certs.py --- .../test_handler/test_handler_ca_certs.py | 376 ++++++++++-------- 1 file changed, 220 insertions(+), 156 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index c6b739ef5b6..42228767f03 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -7,8 +7,10 @@ from cloudinit import subp from cloudinit import util -from cloudinit.tests.helpers import TestCase +from cloudinit.tests.helpers import ( + TestCase, FilesystemMockingTestCase) +import os import logging import shutil import tempfile @@ -43,31 +45,36 @@ def test_no_config(self): self.assertEqual(certs_mock.call_count, 0) -class TestConfig(TestCase): +class TestConfig(FilesystemMockingTestCase): def setUp(self): super(TestConfig, self).setUp() + self.new_root = self.tmp_dir() self.name = "ca-certs" - distro = self._fetch_distro('ubuntu') - self.paths = None - self.cloud = cloud.Cloud(None, self.paths, None, distro, None) - self.log = logging.getLogger("TestNoConfig") + self.log = logging.getLogger("TestConfig") self.args = [] + def _get_cloud(self, distro, sys_cfg=None): + self.new_root = self.reRoot(root=self.new_root) + paths = helpers.Paths({'cloud_dir': self.new_root}) + cls = distros.fetch(distro) + if not sys_cfg: + sys_cfg = {} + mydist = cls(distro, sys_cfg, paths) + self.mocks = ExitStack() self.addCleanup(self.mocks.close) # Mock out the functions that actually modify the system self.mock_add = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'add_ca_certs')) - self.mock_update = self.mocks.enter_context( + self.mock_update_certs = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'update_ca_certs')) + self.mock_update_config = self.mocks.enter_context( + mock.patch.object(cc_ca_certs, 'update_cert_config')) self.mock_remove = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) - def _fetch_distro(self, kind): - cls = distros.fetch(kind) - paths = helpers.Paths({}) - return cls(kind, {}, paths) + return cloud.Cloud(None, paths, sys_cfg, mydist, None) def test_no_trusted_list(self): """ @@ -75,97 +82,127 @@ def test_no_trusted_list(self): present. """ config = {"ca-certs": {}} - - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 0) def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 0) def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.mock_add.assert_called_once_with('ubuntu', ['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.mock_add.assert_called_once_with(distro, ['CERT1']) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 0) def test_multiple_trusted(self): """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) - self.mock_add.assert_called_once_with('ubuntu', ['CERT1', 'CERT2']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.mock_add.assert_called_once_with(distro, ['CERT1', 'CERT2']) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 0) def test_remove_default_ca_certs(self): """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 1) def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 0) def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) - - self.mock_add.assert_called_once_with('ubuntu', ['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self.mock_add.assert_called_once_with(distro, ['CERT1']) + self.assertEqual(self.mock_update_certs.call_count, 1) + self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_remove.call_count, 1) -class TestAddCaCerts(TestCase): +class TestAddCaCerts(FilesystemMockingTestCase): def setUp(self): super(TestAddCaCerts, self).setUp() - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) + self.new_root = self.tmp_dir() + self.name = "ca-certs" + self.log = logging.getLogger("TestAddCaCerts") + self.args = [] - def _fetch_distro(self, kind): - cls = distros.fetch(kind) - paths = helpers.Paths({}) - return cls(kind, {}, paths) + def _get_cloud(self, distro, sys_cfg=None): + self.new_root = self.reRoot(root=self.new_root) + paths = helpers.Paths({'cloud_dir': self.new_root}) + cls = distros.fetch(distro) + if not sys_cfg: + sys_cfg = {} + mydist = cls(distro, sys_cfg, paths) + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) + + return cloud.Cloud(None, paths, sys_cfg, mydist, None) + + def _generate_file(self, path=None, content=None): + if not path: + return + conf_path = os.path.join(self.new_root, path) + if not os.path.isfile(conf_path): + util.write_file(conf_path, content=content) + return + def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" - with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs('ubuntu', []) - self.assertEqual(mockobj.call_count, 0) - with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs('rhel', []) - self.assertEqual(mockobj.call_count, 0) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + with mock.patch.object(util, 'write_file') as mockobj: + cc_ca_certs.add_ca_certs(distro, []) + self.assertEqual(mockobj.call_count, 0) def test_single_cert_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -175,20 +212,26 @@ def test_single_cert_trailing_cr(self): ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) + + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs('ubuntu', [cert]) + cc_ca_certs.add_ca_certs(distro, [cert]) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + mock_write.assert_has_calls([mock.call(distro_ca_certs_configs['ca_cert_full_path'], + cert, mode=0o644)]) + if distro_ca_certs_configs['ca_cert_config'] is not None: + mock_write.assert_has_calls([mock.call(distro_ca_certs_configs['ca_cert_config'], + expected, omode="wb")]) + mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -197,24 +240,31 @@ def test_single_cert_no_trailing_cr(self): ca_certs_content = "line1\nline2\nline3" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) - cc_ca_certs.add_ca_certs('ubuntu', [cert]) + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode="wb")]) + cc_ca_certs.add_ca_certs(distro, [cert]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_full_path'], + cert, mode=0o644)]) + + if distro_ca_certs_configs['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + distro_ca_certs_configs['ca_cert_filename']), + omode="wb")]) + mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) def test_single_cert_to_empty_existing_ca_file(self): """Test adding a single certificate to the trusted CAs @@ -223,20 +273,27 @@ def test_single_cert_to_empty_existing_ca_file(self): expected = "cloud-init-ca-certs.crt\n" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file', autospec=True)) - mock_stat = mocks.enter_context( - mock.patch("cloudinit.config.cc_ca_certs.os.stat") - ) - mock_stat.return_value.st_size = 0 + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_ca_certs_configs['ca_cert_config'], '') + + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file', autospec=True)) + mock_stat = mocks.enter_context( + mock.patch("cloudinit.config.cc_ca_certs.os.stat") + ) + mock_stat.return_value.st_size = 0 - cc_ca_certs.add_ca_certs('ubuntu', [cert]) + cc_ca_certs.add_ca_certs(distro, [cert]) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_full_path'], + cert, mode=0o644)]) + if distro_ca_certs_configs['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_config'], expected, omode="wb")]) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" @@ -244,81 +301,88 @@ def test_multiple_certs(self): expected_cert_file = "\n".join(certs) ca_certs_content = "line1\nline2\nline3" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) - cc_ca_certs.add_ca_certs('ubuntu', certs) + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - expected_cert_file, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode='wb')]) + cc_ca_certs.add_ca_certs(distro, certs) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_full_path'], + expected_cert_file, mode=0o644)]) + + if distro_ca_certs_configs['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + distro_ca_certs_configs['ca_cert_filename']), + omode='wb')]) + + mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): - with mock.patch.object(subp, 'subp') as mockobj: - cc_ca_certs.update_ca_certs('ubuntu') - mockobj.assert_called_once_with( - ["update-ca-certificates"], capture=False) + for distro in cc_ca_certs.distros: + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + with mock.patch.object(subp, 'subp') as mockobj: + cc_ca_certs.update_ca_certs(distro) + mockobj.assert_called_once_with( + distro_ca_certs_configs['ca_cert_update_cmd'], capture=False) - with mock.patch.object(subp, 'subp') as mockobj: - cc_ca_certs.update_ca_certs('rhel') - mockobj.assert_called_once_with( - ["update-ca-trust"], capture=False) - -class TestRemoveDefaultCaCerts(TestCase): +class TestRemoveDefaultCaCerts(FilesystemMockingTestCase): def setUp(self): super(TestRemoveDefaultCaCerts, self).setUp() - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - self.paths = helpers.Paths({ - 'cloud_dir': tmpdir, - }) + self.new_root = self.tmp_dir() + self.name = "ca-certs" + self.log = logging.getLogger("TestRemoveDefaultCaCerts") + self.args = [] - def test_commands(self): - with ExitStack() as mocks: - mock_delete = mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp')) + def _get_cloud(self, distro, sys_cfg=None): + self.new_root = self.reRoot(root=self.new_root) + paths = helpers.Paths({'cloud_dir': self.new_root}) + cls = distros.fetch(distro) + if not sys_cfg: + sys_cfg = {} + mydist = cls(distro, sys_cfg, paths) - cc_ca_certs.remove_default_ca_certs('ubuntu') + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) - mock_delete.assert_has_calls([ - mock.call("/usr/share/ca-certificates/"), - mock.call("/etc/ssl/certs/")]) + self.mock_delete = self.mocks.enter_context( + mock.patch.object(util, 'delete_dir_contents')) + self.mock_write = self.mocks.enter_context( + mock.patch.object(util, 'write_file')) + self.mock_subp = self.mocks.enter_context(mock.patch.object(subp, 'subp')) - mock_write.assert_called_once_with( - "/etc/ca-certificates.conf", "", mode=0o644) + def test_commands(self): + for distro in cc_ca_certs.distros: + mycloud = self._get_cloud(distro) + distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) - mock_subp.assert_called_once_with( - ('debconf-set-selections', '-'), - "ca-certificates ca-certificates/trust_new_crts select no") + cc_ca_certs.remove_default_ca_certs(distro) - with ExitStack() as mocks: - mock_delete = mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp')) + self.mock_delete.assert_has_calls([ + mock.call(distro_ca_certs_configs['ca_cert_path']), + mock.call(distro_ca_certs_configs['ca_cert_system_path'])]) - cc_ca_certs.remove_default_ca_certs('rhel') + if distro_ca_certs_configs['ca_cert_config'] is not None: + self.mock_write.assert_called_once_with( + distro_ca_certs_configs['ca_cert_config'], "", mode=0o644) - mock_delete.assert_has_calls([ - mock.call("/usr/share/pki/ca-trust-source/"), - mock.call("/etc/pki/ca-trust/")]) + if distro in ['debian', 'ubuntu']: + self.mock_subp.assert_called_once_with( + ('debconf-set-selections', '-'), + "ca-certificates ca-certificates/trust_new_crts select no") -# vi: ts=4 expandtab +## vi: ts=4 expandtab From d4ed29d250fb2f38f221ab1b614a9c77860adb49 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Tue, 8 Dec 2020 00:40:14 +0900 Subject: [PATCH 10/16] Update test_handler_ca_certs.py --- .../test_handler/test_handler_ca_certs.py | 126 +++++++++--------- 1 file changed, 62 insertions(+), 64 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 42228767f03..c464d9a97b1 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -7,13 +7,10 @@ from cloudinit import subp from cloudinit import util -from cloudinit.tests.helpers import ( - TestCase, FilesystemMockingTestCase) +from cloudinit.tests.helpers import FilesystemMockingTestCase import os import logging -import shutil -import tempfile import unittest from contextlib import ExitStack from unittest import mock @@ -61,6 +58,10 @@ def _get_cloud(self, distro, sys_cfg=None): sys_cfg = {} mydist = cls(distro, sys_cfg, paths) + self._mock_init() + return cloud.Cloud(None, paths, sys_cfg, mydist, None) + + def _mock_init(self): self.mocks = ExitStack() self.addCleanup(self.mocks.close) @@ -74,8 +75,6 @@ def _get_cloud(self, distro, sys_cfg=None): self.mock_remove = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) - return cloud.Cloud(None, paths, sys_cfg, mydist, None) - def test_no_trusted_list(self): """ Test that no certificates are written if the 'trusted' key is not @@ -183,23 +182,20 @@ def _get_cloud(self, distro, sys_cfg=None): sys_cfg = {} mydist = cls(distro, sys_cfg, paths) - self.mocks = ExitStack() - self.addCleanup(self.mocks.close) - return cloud.Cloud(None, paths, sys_cfg, mydist, None) def _generate_file(self, path=None, content=None): + self.new_root = self.reRoot(root=self.new_root) if not path: return conf_path = os.path.join(self.new_root, path) if not os.path.isfile(conf_path): util.write_file(conf_path, content=content) return - + def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) with mock.patch.object(util, 'write_file') as mockobj: cc_ca_certs.add_ca_certs(distro, []) self.assertEqual(mockobj.call_count, 0) @@ -213,9 +209,9 @@ def test_single_cert_trailing_cr(self): expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_conf['ca_cert_config'], + ca_certs_content) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -226,12 +222,15 @@ def test_single_cert_trailing_cr(self): cc_ca_certs.add_ca_certs(distro, [cert]) - mock_write.assert_has_calls([mock.call(distro_ca_certs_configs['ca_cert_full_path'], - cert, mode=0o644)]) - if distro_ca_certs_configs['ca_cert_config'] is not None: - mock_write.assert_has_calls([mock.call(distro_ca_certs_configs['ca_cert_config'], - expected, omode="wb")]) - mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) + mock_write.assert_has_calls([ + mock.call(distro_conf['ca_cert_full_path'], + cert, mode=0o644)]) + if distro_conf['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(distro_conf['ca_cert_config'], + expected, omode="wb")]) + mock_load.assert_called_once_with( + distro_conf['ca_cert_config']) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -241,9 +240,9 @@ def test_single_cert_no_trailing_cr(self): ca_certs_content = "line1\nline2\nline3" for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_conf['ca_cert_config'], + ca_certs_content) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -255,16 +254,18 @@ def test_single_cert_no_trailing_cr(self): cc_ca_certs.add_ca_certs(distro, [cert]) mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_full_path'], + mock.call(distro_conf['ca_cert_full_path'], cert, mode=0o644)]) - if distro_ca_certs_configs['ca_cert_config'] is not None: + if distro_conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - distro_ca_certs_configs['ca_cert_filename']), - omode="wb")]) - mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) + mock.call( + distro_conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + distro_conf['ca_cert_filename']), + omode="wb")]) + mock_load.assert_called_once_with( + distro_conf['ca_cert_config']) def test_single_cert_to_empty_existing_ca_file(self): """Test adding a single certificate to the trusted CAs @@ -274,9 +275,8 @@ def test_single_cert_to_empty_existing_ca_file(self): expected = "cloud-init-ca-certs.crt\n" for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_ca_certs_configs['ca_cert_config'], '') + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_conf['ca_cert_config'], '') with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -289,11 +289,12 @@ def test_single_cert_to_empty_existing_ca_file(self): cc_ca_certs.add_ca_certs(distro, [cert]) mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_full_path'], + mock.call(distro_conf['ca_cert_full_path'], cert, mode=0o644)]) - if distro_ca_certs_configs['ca_cert_config'] is not None: + if distro_conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_config'], expected, omode="wb")]) + mock.call(distro_conf['ca_cert_config'], + expected, omode="wb")]) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" @@ -302,9 +303,9 @@ def test_multiple_certs(self): ca_certs_content = "line1\nline2\nline3" for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_ca_certs_configs['ca_cert_config'], ca_certs_content) + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + self._generate_file(distro_conf['ca_cert_config'], + ca_certs_content) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -316,27 +317,29 @@ def test_multiple_certs(self): cc_ca_certs.add_ca_certs(distro, certs) mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_full_path'], + mock.call(distro_conf['ca_cert_full_path'], expected_cert_file, mode=0o644)]) - if distro_ca_certs_configs['ca_cert_config'] is not None: + if distro_conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - distro_ca_certs_configs['ca_cert_filename']), - omode='wb')]) + mock.call( + distro_conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + distro_conf['ca_cert_filename']), + omode='wb')]) - mock_load.assert_called_once_with(distro_ca_certs_configs['ca_cert_config']) + mock_load.assert_called_once_with( + distro_conf['ca_cert_config']) class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): for distro in cc_ca_certs.distros: - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) with mock.patch.object(subp, 'subp') as mockobj: cc_ca_certs.update_ca_certs(distro) mockobj.assert_called_once_with( - distro_ca_certs_configs['ca_cert_update_cmd'], capture=False) + distro_conf['ca_cert_update_cmd'], capture=False) class TestRemoveDefaultCaCerts(FilesystemMockingTestCase): @@ -347,15 +350,9 @@ def setUp(self): self.name = "ca-certs" self.log = logging.getLogger("TestRemoveDefaultCaCerts") self.args = [] + self.reRoot(root=self.new_root) - def _get_cloud(self, distro, sys_cfg=None): - self.new_root = self.reRoot(root=self.new_root) - paths = helpers.Paths({'cloud_dir': self.new_root}) - cls = distros.fetch(distro) - if not sys_cfg: - sys_cfg = {} - mydist = cls(distro, sys_cfg, paths) - + def _mock_init(self): self.mocks = ExitStack() self.addCleanup(self.mocks.close) @@ -363,26 +360,27 @@ def _get_cloud(self, distro, sys_cfg=None): mock.patch.object(util, 'delete_dir_contents')) self.mock_write = self.mocks.enter_context( mock.patch.object(util, 'write_file')) - self.mock_subp = self.mocks.enter_context(mock.patch.object(subp, 'subp')) + self.mock_subp = self.mocks.enter_context( + mock.patch.object(subp, 'subp')) def test_commands(self): for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - distro_ca_certs_configs = cc_ca_certs._distro_ca_certs_configs(distro) + self._mock_init() + distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) cc_ca_certs.remove_default_ca_certs(distro) self.mock_delete.assert_has_calls([ - mock.call(distro_ca_certs_configs['ca_cert_path']), - mock.call(distro_ca_certs_configs['ca_cert_system_path'])]) + mock.call(distro_conf['ca_cert_path']), + mock.call(distro_conf['ca_cert_system_path'])]) - if distro_ca_certs_configs['ca_cert_config'] is not None: + if distro_conf['ca_cert_config'] is not None: self.mock_write.assert_called_once_with( - distro_ca_certs_configs['ca_cert_config'], "", mode=0o644) + distro_conf['ca_cert_config'], "", mode=0o644) if distro in ['debian', 'ubuntu']: self.mock_subp.assert_called_once_with( ('debconf-set-selections', '-'), "ca-certificates ca-certificates/trust_new_crts select no") -## vi: ts=4 expandtab +# vi: ts=4 expandtab From f8943343be51207e382976e4e304fde2e59211af Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Tue, 8 Dec 2020 00:41:11 +0900 Subject: [PATCH 11/16] Update cc_ca_certs.py --- cloudinit/config/cc_ca_certs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 53fa8a02a0d..415ecb539f1 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -117,7 +117,7 @@ def update_cert_config(distro_name): @param distro: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) - if distro_cfg['ca_cert_config'] is None: + if distro_cfg['ca_cert_config'] is None: return if os.stat(distro_cfg['ca_cert_config']).st_size == 0: # If the CA_CERT_CONFIG file is empty (i.e. all existing From dd13794c36777b66f4415b3590904a0a79eedd7b Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Wed, 16 Dec 2020 15:14:03 +0900 Subject: [PATCH 12/16] Update test_handler_ca_certs.py --- .../test_handler/test_handler_ca_certs.py | 269 +++++++++--------- 1 file changed, 127 insertions(+), 142 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index c464d9a97b1..e392fe83609 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -7,10 +7,11 @@ from cloudinit import subp from cloudinit import util -from cloudinit.tests.helpers import FilesystemMockingTestCase +from cloudinit.tests.helpers import TestCase -import os import logging +import shutil +import tempfile import unittest from contextlib import ExitStack from unittest import mock @@ -42,24 +43,22 @@ def test_no_config(self): self.assertEqual(certs_mock.call_count, 0) -class TestConfig(FilesystemMockingTestCase): +class TestConfig(TestCase): def setUp(self): super(TestConfig, self).setUp() - self.new_root = self.tmp_dir() self.name = "ca-certs" - self.log = logging.getLogger("TestConfig") + self.paths = None + self.log = logging.getLogger("TestNoConfig") self.args = [] - def _get_cloud(self, distro, sys_cfg=None): - self.new_root = self.reRoot(root=self.new_root) - paths = helpers.Paths({'cloud_dir': self.new_root}) - cls = distros.fetch(distro) - if not sys_cfg: - sys_cfg = {} - mydist = cls(distro, sys_cfg, paths) + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) - self._mock_init() - return cloud.Cloud(None, paths, sys_cfg, mydist, None) + def _get_cloud(self, kind): + distro = self._fetch_distro(kind) + return cloud.Cloud(None, self.paths, None, distro, None) def _mock_init(self): self.mocks = ExitStack() @@ -68,10 +67,8 @@ def _mock_init(self): # Mock out the functions that actually modify the system self.mock_add = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'add_ca_certs')) - self.mock_update_certs = self.mocks.enter_context( + self.mock_update = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'update_ca_certs')) - self.mock_update_config = self.mocks.enter_context( - mock.patch.object(cc_ca_certs, 'update_cert_config')) self.mock_remove = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) @@ -81,36 +78,40 @@ def test_no_trusted_list(self): present. """ config = {"ca-certs": {}} + for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} + for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} + for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.mock_add.assert_called_once_with(distro, ['CERT1']) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_multiple_trusted(self): @@ -118,12 +119,12 @@ def test_multiple_trusted(self): config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.mock_add.assert_called_once_with(distro, ['CERT1', 'CERT2']) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_remove_default_ca_certs(self): @@ -131,12 +132,12 @@ def test_remove_default_ca_certs(self): config = {"ca-certs": {"remove-defaults": True}} for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 1) def test_no_remove_defaults_if_false(self): @@ -144,12 +145,12 @@ def test_no_remove_defaults_if_false(self): config = {"ca-certs": {"remove-defaults": False}} for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) def test_correct_order_for_remove_then_add(self): @@ -157,41 +158,29 @@ def test_correct_order_for_remove_then_add(self): config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} for distro in cc_ca_certs.distros: - mycloud = self._get_cloud(distro) - cc_ca_certs.handle(self.name, config, mycloud, self.log, self.args) + self._mock_init() + cloud = self._get_cloud(distro) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.mock_add.assert_called_once_with(distro, ['CERT1']) - self.assertEqual(self.mock_update_certs.call_count, 1) - self.assertEqual(self.mock_update_config.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 1) -class TestAddCaCerts(FilesystemMockingTestCase): +class TestAddCaCerts(TestCase): + def setUp(self): super(TestAddCaCerts, self).setUp() - self.new_root = self.tmp_dir() - self.name = "ca-certs" - self.log = logging.getLogger("TestAddCaCerts") - self.args = [] + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + self.paths = helpers.Paths({ + 'cloud_dir': tmpdir, + }) - def _get_cloud(self, distro, sys_cfg=None): - self.new_root = self.reRoot(root=self.new_root) - paths = helpers.Paths({'cloud_dir': self.new_root}) - cls = distros.fetch(distro) - if not sys_cfg: - sys_cfg = {} - mydist = cls(distro, sys_cfg, paths) - - return cloud.Cloud(None, paths, sys_cfg, mydist, None) - - def _generate_file(self, path=None, content=None): - self.new_root = self.reRoot(root=self.new_root) - if not path: - return - conf_path = os.path.join(self.new_root, path) - if not os.path.isfile(conf_path): - util.write_file(conf_path, content=content) - return + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" @@ -209,9 +198,7 @@ def test_single_cert_trailing_cr(self): expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" for distro in cc_ca_certs.distros: - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_conf['ca_cert_config'], - ca_certs_content) + conf = cc_ca_certs._distro_ca_certs_configs(distro) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -219,18 +206,21 @@ def test_single_cert_trailing_cr(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) + mock_stat = mocks.enter_context( + mock.patch("cloudinit.config.cc_ca_certs.os.stat") + ) + mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, [cert]) + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_full_path'], + cert, mode=0o644)]) + if conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_full_path'], - cert, mode=0o644)]) - if distro_conf['ca_cert_config'] is not None: - mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_config'], - expected, omode="wb")]) - mock_load.assert_called_once_with( - distro_conf['ca_cert_config']) + mock.call(conf['ca_cert_config'], + expected, omode="wb")]) + mock_load.assert_called_once_with(conf['ca_cert_config']) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -240,9 +230,7 @@ def test_single_cert_no_trailing_cr(self): ca_certs_content = "line1\nline2\nline3" for distro in cc_ca_certs.distros: - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_conf['ca_cert_config'], - ca_certs_content) + conf = cc_ca_certs._distro_ca_certs_configs(distro) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -250,22 +238,24 @@ def test_single_cert_no_trailing_cr(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) + mock_stat = mocks.enter_context( + mock.patch("cloudinit.config.cc_ca_certs.os.stat") + ) + mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, [cert]) mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_full_path'], + mock.call(conf['ca_cert_full_path'], cert, mode=0o644)]) - - if distro_conf['ca_cert_config'] is not None: + if conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call( - distro_conf['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - distro_conf['ca_cert_filename']), - omode="wb")]) - mock_load.assert_called_once_with( - distro_conf['ca_cert_config']) + mock.call(conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + conf['ca_cert_filename']), + omode="wb")]) + + mock_load.assert_called_once_with(conf['ca_cert_config']) def test_single_cert_to_empty_existing_ca_file(self): """Test adding a single certificate to the trusted CAs @@ -275,8 +265,7 @@ def test_single_cert_to_empty_existing_ca_file(self): expected = "cloud-init-ca-certs.crt\n" for distro in cc_ca_certs.distros: - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_conf['ca_cert_config'], '') + conf = cc_ca_certs._distro_ca_certs_configs(distro) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -289,11 +278,11 @@ def test_single_cert_to_empty_existing_ca_file(self): cc_ca_certs.add_ca_certs(distro, [cert]) mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_full_path'], + mock.call(conf['ca_cert_full_path'], cert, mode=0o644)]) - if distro_conf['ca_cert_config'] is not None: + if conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_config'], + mock.call(conf['ca_cert_config'], expected, omode="wb")]) def test_multiple_certs(self): @@ -303,9 +292,7 @@ def test_multiple_certs(self): ca_certs_content = "line1\nline2\nline3" for distro in cc_ca_certs.distros: - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) - self._generate_file(distro_conf['ca_cert_config'], - ca_certs_content) + conf = cc_ca_certs._distro_ca_certs_configs(distro) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -313,74 +300,72 @@ def test_multiple_certs(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) + mock_stat = mocks.enter_context( + mock.patch("cloudinit.config.cc_ca_certs.os.stat") + ) + mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, certs) mock_write.assert_has_calls([ - mock.call(distro_conf['ca_cert_full_path'], + mock.call(conf['ca_cert_full_path'], expected_cert_file, mode=0o644)]) - - if distro_conf['ca_cert_config'] is not None: + if conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call( - distro_conf['ca_cert_config'], - "%s\n%s\n" % (ca_certs_content, - distro_conf['ca_cert_filename']), - omode='wb')]) + mock.call(conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + conf['ca_cert_filename']), + omode='wb')]) - mock_load.assert_called_once_with( - distro_conf['ca_cert_config']) + mock_load.assert_called_once_with(conf['ca_cert_config']) class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): for distro in cc_ca_certs.distros: - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + conf = cc_ca_certs._distro_ca_certs_configs(distro) with mock.patch.object(subp, 'subp') as mockobj: cc_ca_certs.update_ca_certs(distro) mockobj.assert_called_once_with( - distro_conf['ca_cert_update_cmd'], capture=False) + conf['ca_cert_update_cmd'], capture=False) -class TestRemoveDefaultCaCerts(FilesystemMockingTestCase): +class TestRemoveDefaultCaCerts(TestCase): def setUp(self): super(TestRemoveDefaultCaCerts, self).setUp() - self.new_root = self.tmp_dir() - self.name = "ca-certs" - self.log = logging.getLogger("TestRemoveDefaultCaCerts") - self.args = [] - self.reRoot(root=self.new_root) - - def _mock_init(self): - self.mocks = ExitStack() - self.addCleanup(self.mocks.close) - - self.mock_delete = self.mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) - self.mock_write = self.mocks.enter_context( - mock.patch.object(util, 'write_file')) - self.mock_subp = self.mocks.enter_context( - mock.patch.object(subp, 'subp')) + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + self.paths = helpers.Paths({ + 'cloud_dir': tmpdir, + }) def test_commands(self): for distro in cc_ca_certs.distros: - self._mock_init() - distro_conf = cc_ca_certs._distro_ca_certs_configs(distro) + conf = cc_ca_certs._distro_ca_certs_configs(distro) + + with ExitStack() as mocks: + mock_delete = mocks.enter_context( + mock.patch.object(util, 'delete_dir_contents')) + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_subp = mocks.enter_context( + mock.patch.object(subp, 'subp')) - cc_ca_certs.remove_default_ca_certs(distro) + cc_ca_certs.remove_default_ca_certs(distro) - self.mock_delete.assert_has_calls([ - mock.call(distro_conf['ca_cert_path']), - mock.call(distro_conf['ca_cert_system_path'])]) + mock_delete.assert_has_calls([ + mock.call(conf['ca_cert_path']), + mock.call(conf['ca_cert_system_path'])]) - if distro_conf['ca_cert_config'] is not None: - self.mock_write.assert_called_once_with( - distro_conf['ca_cert_config'], "", mode=0o644) + if conf['ca_cert_config'] is not None: + mock_write.assert_called_once_with( + conf['ca_cert_config'], "", mode=0o644) - if distro in ['debian', 'ubuntu']: - self.mock_subp.assert_called_once_with( - ('debconf-set-selections', '-'), - "ca-certificates ca-certificates/trust_new_crts select no") + if distro in ['debian', 'ubuntu']: + mock_subp.assert_called_once_with( + ('debconf-set-selections', '-'), + "ca-certificates \ +ca-certificates/trust_new_crts select no") # vi: ts=4 expandtab From 76faf46f9615001993a3579c7e18c3a9baf24883 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Wed, 16 Dec 2020 16:13:13 +0900 Subject: [PATCH 13/16] Update test_handler_ca_certs.py --- .../test_handler/test_handler_ca_certs.py | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index a7a6ebfcff7..0259c4de1ce 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -198,6 +198,8 @@ def test_single_cert_trailing_cr(self): ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" + self.m_stat.return_value.st_size = 1 + for distro in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro) @@ -207,21 +209,17 @@ def test_single_cert_trailing_cr(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - mock_stat = mocks.enter_context( - mock.patch("cloudinit.config.cc_ca_certs.os.stat") - ) - mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, [cert]) - mock_write.assert_has_calls([ - mock.call(conf['ca_cert_full_path'], - cert, mode=0o644)]) - if conf['ca_cert_config'] is not None: mock_write.assert_has_calls([ - mock.call(conf['ca_cert_config'], - expected, omode="wb")]) - mock_load.assert_called_once_with(conf['ca_cert_config']) + mock.call(conf['ca_cert_full_path'], + cert, mode=0o644)]) + if conf['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_config'], + expected, omode="wb")]) + mock_load.assert_called_once_with(conf['ca_cert_config']) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -230,6 +228,8 @@ def test_single_cert_no_trailing_cr(self): ca_certs_content = "line1\nline2\nline3" + self.m_stat.return_value.st_size = 1 + for distro in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro) @@ -239,10 +239,6 @@ def test_single_cert_no_trailing_cr(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - mock_stat = mocks.enter_context( - mock.patch("cloudinit.config.cc_ca_certs.os.stat") - ) - mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, [cert]) @@ -265,10 +261,12 @@ def test_single_cert_to_empty_existing_ca_file(self): expected = "cloud-init-ca-certs.crt\n" + self.m_stat.return_value.st_size = 0 + for distro in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro) - with mock.patch.object(util, 'write_file', autospec=True) as m_write: - self.m_stat.return_value.st_size = 0 + with mock.patch.object(util, 'write_file', + autospec=True) as m_write: cc_ca_certs.add_ca_certs(distro, [cert]) @@ -286,6 +284,8 @@ def test_multiple_certs(self): expected_cert_file = "\n".join(certs) ca_certs_content = "line1\nline2\nline3" + self.m_stat.return_value.st_size = 1 + for distro in cc_ca_certs.distros: conf = cc_ca_certs._distro_ca_certs_configs(distro) @@ -295,10 +295,6 @@ def test_multiple_certs(self): mock_load = mocks.enter_context( mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - mock_stat = mocks.enter_context( - mock.patch("cloudinit.config.cc_ca_certs.os.stat") - ) - mock_stat.return_value.st_size = 1 cc_ca_certs.add_ca_certs(distro, certs) From a68366605ca1f62ba6643c76b068ca8c46ec22d3 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 17 Dec 2020 12:26:46 +0900 Subject: [PATCH 14/16] Update cc_ca_certs.py --- cloudinit/config/cc_ca_certs.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index 415ecb539f1..a8cdf4dd9bc 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -65,17 +65,13 @@ distros = ['alpine', 'debian', 'ubuntu', 'rhel'] -def _distro_ca_certs_configs(distro): +def _distro_ca_certs_configs(distro_name): """Return a distro-specific ca_certs config dictionary - @param distro: String providing the distro class name. + @param distro_name: String providing the distro class name. @returns: Dict of distro configurations for ca-cert. """ - cfg = DEFAULT_CONFIG - dcfg = DISTRO_OVERRIDES - if distro in dcfg: - cfg = dcfg[distro] - + cfg = DISTRO_OVERRIDES.get(distro_name, DEFAULT_CONFIG) cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'], cfg['ca_cert_filename']) return cfg @@ -85,7 +81,7 @@ def update_ca_certs(distro_name): """ Updates the CA certificate cache on the current machine. - @param distro: String providing the distro class name. + @param distro_name: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) subp.subp(distro_cfg['ca_cert_update_cmd'], capture=False) @@ -96,7 +92,7 @@ def add_ca_certs(distro_name, certs): Adds certificates to the system. To actually apply the new certificates you must also call L{update_ca_certs}. - @param distro: String providing the distro class name. + @param distro_name: String providing the distro class name. @param certs: A list of certificate strings. """ distro_cfg = _distro_ca_certs_configs(distro_name) @@ -114,7 +110,7 @@ def update_cert_config(distro_name): """ Update Certificate config file to add the file path managed cloud-init - @param distro: String providing the distro class name. + @param distro_name: String providing the distro class name. """ distro_cfg = _distro_ca_certs_configs(distro_name) if distro_cfg['ca_cert_config'] is None: From c137f003f06dd80eecae4cb801d50ded5b300376 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 17 Dec 2020 13:03:02 +0900 Subject: [PATCH 15/16] Update cc_ca_certs.py --- cloudinit/config/cc_ca_certs.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/cloudinit/config/cc_ca_certs.py b/cloudinit/config/cc_ca_certs.py index a8cdf4dd9bc..bd7bead9410 100644 --- a/cloudinit/config/cc_ca_certs.py +++ b/cloudinit/config/cc_ca_certs.py @@ -77,25 +77,23 @@ def _distro_ca_certs_configs(distro_name): return cfg -def update_ca_certs(distro_name): +def update_ca_certs(distro_cfg): """ Updates the CA certificate cache on the current machine. - @param distro_name: String providing the distro class name. + @param distro_cfg: A hash providing _distro_ca_certs_configs function. """ - distro_cfg = _distro_ca_certs_configs(distro_name) subp.subp(distro_cfg['ca_cert_update_cmd'], capture=False) -def add_ca_certs(distro_name, certs): +def add_ca_certs(distro_cfg, certs): """ Adds certificates to the system. To actually apply the new certificates you must also call L{update_ca_certs}. - @param distro_name: String providing the distro class name. + @param distro_cfg: A hash providing _distro_ca_certs_configs function. @param certs: A list of certificate strings. """ - distro_cfg = _distro_ca_certs_configs(distro_name) if not certs: return # First ensure they are strings... @@ -103,16 +101,15 @@ def add_ca_certs(distro_name, certs): util.write_file(distro_cfg['ca_cert_full_path'], cert_file_contents, mode=0o644) - update_cert_config(distro_name) + update_cert_config(distro_cfg) -def update_cert_config(distro_name): +def update_cert_config(distro_cfg): """ Update Certificate config file to add the file path managed cloud-init - @param distro_name: String providing the distro class name. + @param distro_cfg: A hash providing _distro_ca_certs_configs function. """ - distro_cfg = _distro_ca_certs_configs(distro_name) if distro_cfg['ca_cert_config'] is None: return if os.stat(distro_cfg['ca_cert_config']).st_size == 0: @@ -132,12 +129,14 @@ def update_cert_config(distro_name): util.write_file(distro_cfg['ca_cert_config'], out, omode="wb") -def remove_default_ca_certs(distro_name): +def remove_default_ca_certs(distro_name, distro_cfg): """ Removes all default trusted CA certificates from the system. To actually apply the change you must also call L{update_ca_certs}. + + @param distro_name: String providing the distro class name. + @param distro_cfg: A hash providing _distro_ca_certs_configs function. """ - distro_cfg = _distro_ca_certs_configs(distro_name) util.delete_dir_contents(distro_cfg['ca_cert_path']) util.delete_dir_contents(distro_cfg['ca_cert_system_path']) util.write_file(distro_cfg['ca_cert_config'], "", mode=0o644) @@ -165,22 +164,23 @@ def handle(name, cfg, cloud, log, _args): return ca_cert_cfg = cfg['ca-certs'] + distro_cfg = _distro_ca_certs_configs(cloud.distro.name) # If there is a remove-defaults option set to true, remove the system # default trusted CA certs first. if ca_cert_cfg.get("remove-defaults", False): log.debug("Removing default certificates") - remove_default_ca_certs(cloud.distro.name) + remove_default_ca_certs(cloud.distro.name, distro_cfg) # If we are given any new trusted CA certs to add, add them. if "trusted" in ca_cert_cfg: trusted_certs = util.get_cfg_option_list(ca_cert_cfg, "trusted") if trusted_certs: log.debug("Adding %d certificates" % len(trusted_certs)) - add_ca_certs(cloud.distro.name, trusted_certs) + add_ca_certs(distro_cfg, trusted_certs) # Update the system with the new cert configuration. log.debug("Updating certificates") - update_ca_certs(cloud.distro.name) + update_ca_certs(distro_cfg) # vi: ts=4 expandtab From 082a67a87852373b7d22308f7c31edff7b2e7440 Mon Sep 17 00:00:00 2001 From: cawamata <1749824+cawamata@users.noreply.github.com> Date: Thu, 17 Dec 2020 13:04:06 +0900 Subject: [PATCH 16/16] Update test_handler_ca_certs.py --- .../test_handler/test_handler_ca_certs.py | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 0259c4de1ce..6e3831ed851 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -79,9 +79,9 @@ def test_no_trusted_list(self): """ config = {"ca-certs": {}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) @@ -92,9 +92,9 @@ def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) @@ -105,12 +105,13 @@ def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(distro, ['CERT1']) + self.mock_add.assert_called_once_with(conf, ['CERT1']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) @@ -118,12 +119,13 @@ def test_multiple_trusted(self): """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(distro, ['CERT1', 'CERT2']) + self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 0) @@ -131,9 +133,9 @@ def test_remove_default_ca_certs(self): """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) @@ -144,9 +146,9 @@ def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) self.assertEqual(self.mock_add.call_count, 0) @@ -157,12 +159,13 @@ def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: self._mock_init() - cloud = self._get_cloud(distro) + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(distro, ['CERT1']) + self.mock_add.assert_called_once_with(conf, ['CERT1']) self.assertEqual(self.mock_update.call_count, 1) self.assertEqual(self.mock_remove.call_count, 1) @@ -185,9 +188,10 @@ def _fetch_distro(self, kind): def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" - for distro in cc_ca_certs.distros: + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs(distro, []) + cc_ca_certs.add_ca_certs(conf, []) self.assertEqual(mockobj.call_count, 0) def test_single_cert_trailing_cr(self): @@ -200,8 +204,8 @@ def test_single_cert_trailing_cr(self): self.m_stat.return_value.st_size = 1 - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -210,7 +214,7 @@ def test_single_cert_trailing_cr(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs(distro, [cert]) + cc_ca_certs.add_ca_certs(conf, [cert]) mock_write.assert_has_calls([ mock.call(conf['ca_cert_full_path'], @@ -230,8 +234,8 @@ def test_single_cert_no_trailing_cr(self): self.m_stat.return_value.st_size = 1 - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -240,7 +244,7 @@ def test_single_cert_no_trailing_cr(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs(distro, [cert]) + cc_ca_certs.add_ca_certs(conf, [cert]) mock_write.assert_has_calls([ mock.call(conf['ca_cert_full_path'], @@ -263,12 +267,12 @@ def test_single_cert_to_empty_existing_ca_file(self): self.m_stat.return_value.st_size = 0 - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with mock.patch.object(util, 'write_file', autospec=True) as m_write: - cc_ca_certs.add_ca_certs(distro, [cert]) + cc_ca_certs.add_ca_certs(conf, [cert]) m_write.assert_has_calls([ mock.call(conf['ca_cert_full_path'], @@ -286,8 +290,8 @@ def test_multiple_certs(self): self.m_stat.return_value.st_size = 1 - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with ExitStack() as mocks: mock_write = mocks.enter_context( @@ -296,7 +300,7 @@ def test_multiple_certs(self): mock.patch.object(util, 'load_file', return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs(distro, certs) + cc_ca_certs.add_ca_certs(conf, certs) mock_write.assert_has_calls([ mock.call(conf['ca_cert_full_path'], @@ -313,10 +317,10 @@ def test_multiple_certs(self): class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with mock.patch.object(subp, 'subp') as mockobj: - cc_ca_certs.update_ca_certs(distro) + cc_ca_certs.update_ca_certs(conf) mockobj.assert_called_once_with( conf['ca_cert_update_cmd'], capture=False) @@ -332,8 +336,8 @@ def setUp(self): }) def test_commands(self): - for distro in cc_ca_certs.distros: - conf = cc_ca_certs._distro_ca_certs_configs(distro) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) with ExitStack() as mocks: mock_delete = mocks.enter_context( @@ -343,7 +347,7 @@ def test_commands(self): mock_subp = mocks.enter_context( mock.patch.object(subp, 'subp')) - cc_ca_certs.remove_default_ca_certs(distro) + cc_ca_certs.remove_default_ca_certs(distro_name, conf) mock_delete.assert_has_calls([ mock.call(conf['ca_cert_path']), @@ -353,7 +357,7 @@ def test_commands(self): mock_write.assert_called_once_with( conf['ca_cert_config'], "", mode=0o644) - if distro in ['debian', 'ubuntu']: + if distro_name in ['debian', 'ubuntu']: mock_subp.assert_called_once_with( ('debconf-set-selections', '-'), "ca-certificates \