Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 84 additions & 39 deletions cloudinit/config/cc_ca_certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

**Module frequency:** per instance

**Supported distros:** alpine, debian, ubuntu
**Supported distros:** alpine, debian, ubuntu, rhel

**Config keys**::

Expand All @@ -44,60 +44,104 @@
from cloudinit import subp
from cloudinit import util

CA_CERT_PATH = "/usr/share/ca-certificates/"
CA_CERT_FILENAME = "cloud-init-ca-certs.crt"
CA_CERT_CONFIG = "/etc/ca-certificates.conf"
CA_CERT_SYSTEM_PATH = "/etc/ssl/certs/"
CA_CERT_FULL_PATH = os.path.join(CA_CERT_PATH, CA_CERT_FILENAME)
DEFAULT_CONFIG = {
'ca_cert_path': '/usr/share/ca-certificates/',
'ca_cert_filename': 'cloud-init-ca-certs.crt',
'ca_cert_config': '/etc/ca-certificates.conf',
'ca_cert_system_path': '/etc/ssl/certs/',
'ca_cert_update_cmd': ['update-ca-certificates']
}
DISTRO_OVERRIDES = {
'rhel': {
'ca_cert_path': '/usr/share/pki/ca-trust-source/',
'ca_cert_filename': 'anchors/cloud-init-ca-certs.crt',
'ca_cert_config': None,
'ca_cert_system_path': '/etc/pki/ca-trust/',
'ca_cert_update_cmd': ['update-ca-trust']
}
}

distros = ['alpine', 'debian', 'ubuntu']

distros = ['alpine', 'debian', 'ubuntu', 'rhel']

def update_ca_certs():

def _distro_ca_certs_configs(distro_name):
"""Return a distro-specific ca_certs config dictionary

@param distro_name: String providing the distro class name.
@returns: Dict of distro configurations for ca-cert.
"""
cfg = DISTRO_OVERRIDES.get(distro_name, DEFAULT_CONFIG)
cfg['ca_cert_full_path'] = os.path.join(cfg['ca_cert_path'],
cfg['ca_cert_filename'])
return cfg


def update_ca_certs(distro_cfg):
"""
Updates the CA certificate cache on the current machine.

@param distro_cfg: A hash providing _distro_ca_certs_configs function.
"""
subp.subp(["update-ca-certificates"], capture=False)
subp.subp(distro_cfg['ca_cert_update_cmd'], capture=False)


def add_ca_certs(certs):
def add_ca_certs(distro_cfg, certs):
"""
Adds certificates to the system. To actually apply the new certificates
you must also call L{update_ca_certs}.

@param distro_cfg: A hash providing _distro_ca_certs_configs function.
@param certs: A list of certificate strings.
"""
if certs:
# First ensure they are strings...
cert_file_contents = "\n".join([str(c) for c in certs])
util.write_file(CA_CERT_FULL_PATH, cert_file_contents, mode=0o644)

if os.stat(CA_CERT_CONFIG).st_size == 0:
# If the CA_CERT_CONFIG file is empty (i.e. all existing
# CA certs have been deleted) then simply output a single
# line with the cloud-init cert filename.
out = "%s\n" % CA_CERT_FILENAME
else:
# Append cert filename to CA_CERT_CONFIG file.
# We have to strip the content because blank lines in the file
# causes subsequent entries to be ignored. (LP: #1077020)
orig = util.load_file(CA_CERT_CONFIG)
cur_cont = '\n'.join([line for line in orig.splitlines()
if line != CA_CERT_FILENAME])
out = "%s\n%s\n" % (cur_cont.rstrip(), CA_CERT_FILENAME)
util.write_file(CA_CERT_CONFIG, out, omode="wb")


def remove_default_ca_certs(distro_name):
if not certs:
return
# First ensure they are strings...
cert_file_contents = "\n".join([str(c) for c in certs])
util.write_file(distro_cfg['ca_cert_full_path'],
cert_file_contents,
mode=0o644)
update_cert_config(distro_cfg)


def update_cert_config(distro_cfg):
"""
Update Certificate config file to add the file path managed cloud-init

@param distro_cfg: A hash providing _distro_ca_certs_configs function.
"""
if distro_cfg['ca_cert_config'] is None:
return
if os.stat(distro_cfg['ca_cert_config']).st_size == 0:
# If the CA_CERT_CONFIG file is empty (i.e. all existing
# CA certs have been deleted) then simply output a single
# line with the cloud-init cert filename.
out = "%s\n" % distro_cfg['ca_cert_filename']
else:
# Append cert filename to CA_CERT_CONFIG file.
# We have to strip the content because blank lines in the file
# causes subsequent entries to be ignored. (LP: #1077020)
orig = util.load_file(distro_cfg['ca_cert_config'])
cr_cont = '\n'.join([line for line in orig.splitlines()
if line != distro_cfg['ca_cert_filename']])
out = "%s\n%s\n" % (cr_cont.rstrip(),
distro_cfg['ca_cert_filename'])
util.write_file(distro_cfg['ca_cert_config'], out, omode="wb")


def remove_default_ca_certs(distro_name, distro_cfg):
"""
Removes all default trusted CA certificates from the system. To actually
apply the change you must also call L{update_ca_certs}.

@param distro_name: String providing the distro class name.
@param distro_cfg: A hash providing _distro_ca_certs_configs function.
"""
util.delete_dir_contents(CA_CERT_PATH)
util.delete_dir_contents(CA_CERT_SYSTEM_PATH)
util.write_file(CA_CERT_CONFIG, "", mode=0o644)
util.delete_dir_contents(distro_cfg['ca_cert_path'])
util.delete_dir_contents(distro_cfg['ca_cert_system_path'])
util.write_file(distro_cfg['ca_cert_config'], "", mode=0o644)

if distro_name != 'alpine':
if distro_name in ['debian', 'ubuntu']:
debconf_sel = (
"ca-certificates ca-certificates/trust_new_crts " + "select no")
subp.subp(('debconf-set-selections', '-'), debconf_sel)
Expand All @@ -120,22 +164,23 @@ def handle(name, cfg, cloud, log, _args):
return

ca_cert_cfg = cfg['ca-certs']
distro_cfg = _distro_ca_certs_configs(cloud.distro.name)

# If there is a remove-defaults option set to true, remove the system
# default trusted CA certs first.
if ca_cert_cfg.get("remove-defaults", False):
log.debug("Removing default certificates")
remove_default_ca_certs(cloud.distro.name)
remove_default_ca_certs(cloud.distro.name, distro_cfg)

# If we are given any new trusted CA certs to add, add them.
if "trusted" in ca_cert_cfg:
trusted_certs = util.get_cfg_option_list(ca_cert_cfg, "trusted")
if trusted_certs:
log.debug("Adding %d certificates" % len(trusted_certs))
add_ca_certs(trusted_certs)
add_ca_certs(distro_cfg, trusted_certs)

# Update the system with the new cert configuration.
log.debug("Updating certificates")
update_ca_certs()
update_ca_certs(distro_cfg)

# vi: ts=4 expandtab
Loading