diff --git a/cloudinit/sources/DataSourceAzure.py b/cloudinit/sources/DataSourceAzure.py index 261993fe1ce..414619342f6 100644 --- a/cloudinit/sources/DataSourceAzure.py +++ b/cloudinit/sources/DataSourceAzure.py @@ -5,7 +5,6 @@ # This file is part of cloud-init. See LICENSE file for license information. import base64 -import functools import logging import os import os.path @@ -50,31 +49,6 @@ ) from cloudinit.url_helper import UrlError -try: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=DeprecationWarning) - import crypt # pylint: disable=W4901 - - blowfish_hash: Any = functools.partial( - crypt.crypt, salt=f"$6${util.rand_str(strlen=16)}" - ) -except (ImportError, AttributeError): - try: - import passlib.hash - - blowfish_hash = passlib.hash.sha512_crypt.hash - except ImportError: - - def blowfish_hash(_): - """Raise when called so that importing this module doesn't throw - ImportError when ds_detect() returns false. In this case, crypt - and passlib are not needed. - """ - raise ImportError( - "crypt and passlib not found, missing dependency" - ) - - LOG = logging.getLogger(__name__) DS_NAME = "Azure" @@ -166,6 +140,35 @@ def find_dev_from_busdev(camcontrol_out: str, busdev: str) -> Optional[str]: return None +def hash_password(password: str) -> str: + """Hash a password using SHA-512 crypt. + + Try to use crypt, falling back to passlib. + + If neither are available, raise ReportableErrorImportError. + + :param password: plaintext password to hash. + :return: The hashed password string. + :raises ReportableErrorImportError: If crypt and passlib are unavailable. + """ + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=DeprecationWarning) + import crypt # pylint: disable=W4901 + + salt = crypt.mksalt(crypt.METHOD_SHA512) + return crypt.crypt(password, salt) + except (ImportError, AttributeError): + pass + + try: + import passlib.hash + + return passlib.hash.sha512_crypt.hash(password) + except ImportError as error: + raise errors.ReportableErrorImportError(error=error) from error + + def normalize_mac_address(mac: str) -> str: """Normalize mac address with colons and lower-case.""" if len(mac) == 12: @@ -1984,7 +1987,7 @@ def read_azure_ovf(contents): if ovf_env.password: defuser["lock_passwd"] = False if DEF_PASSWD_REDACTION != ovf_env.password: - defuser["hashed_passwd"] = encrypt_pass(ovf_env.password) + defuser["hashed_passwd"] = hash_password(ovf_env.password) if defuser: cfg["system_info"] = {"default_user": defuser} @@ -2009,10 +2012,6 @@ def read_azure_ovf(contents): return (md, ud, cfg) -def encrypt_pass(password): - return blowfish_hash(password) - - def find_primary_nic(): candidate_nics = net.find_candidate_nics() if candidate_nics: diff --git a/cloudinit/sources/azure/errors.py b/cloudinit/sources/azure/errors.py index ae676a08d29..8ef9515304b 100644 --- a/cloudinit/sources/azure/errors.py +++ b/cloudinit/sources/azure/errors.py @@ -168,6 +168,13 @@ def __init__(self, *, exception: ValueError) -> None: self.supporting_data["exception"] = repr(exception) +class ReportableErrorImportError(ReportableError): + def __init__(self, *, error: ImportError) -> None: + super().__init__(f"error importing {error.name} library") + + self.supporting_data["error"] = repr(error) + + class ReportableErrorOsDiskPpsFailure(ReportableError): def __init__(self) -> None: super().__init__("error waiting for host shutdown") diff --git a/tests/unittests/sources/azure/test_errors.py b/tests/unittests/sources/azure/test_errors.py index d9dcf13d555..7b405368d39 100644 --- a/tests/unittests/sources/azure/test_errors.py +++ b/tests/unittests/sources/azure/test_errors.py @@ -211,6 +211,15 @@ def test_imds_metadata_parsing_exception(): assert error.supporting_data["exception"] == repr(exception) +def test_import_error(): + exception = ImportError("No module named 'foobar'", name="foobar") + + error = errors.ReportableErrorImportError(error=exception) + + assert error.reason == "error importing foobar library" + assert error.supporting_data["error"] == repr(exception) + + def test_ovf_parsing_exception(): error = None try: diff --git a/tests/unittests/sources/test_azure.py b/tests/unittests/sources/test_azure.py index 67373e1ef1e..3f552dab559 100644 --- a/tests/unittests/sources/test_azure.py +++ b/tests/unittests/sources/test_azure.py @@ -1,16 +1,21 @@ # This file is part of cloud-init. See LICENSE file for license information. # pylint: disable=attribute-defined-outside-init +import builtins import copy import datetime import json import logging import os import stat +import sys import xml.etree.ElementTree as ET from pathlib import Path -import passlib.hash +try: + import passlib.hash +except ImportError: + passlib = None # type: ignore import pytest import requests @@ -1732,12 +1737,13 @@ def test_username_used(self, get_ds): assert "ssh_pwauth" not in dsrc.cfg + @pytest.mark.skipif(passlib is None, reason="passlib not installed") def test_password_given(self, get_ds, mocker): # The crypt module has platform-specific behavior and the purpose of # this test isn't to verify the differences between crypt and passlib, # so hardcode passlib usage as crypt is deprecated. mocker.patch.object( - dsaz, "blowfish_hash", passlib.hash.sha512_crypt.hash + dsaz, "hash_password", passlib.hash.sha512_crypt.hash ) data = { "ovfcontent": construct_ovf_env( @@ -2434,6 +2440,19 @@ def test_wb_invalid_ovf_env_xml_calls_read_azure_ovf(self, tmp_path): == cm.value.reason ) + def test_import_error_from_failed_import(self): + """Attempt to import a module that is not present""" + try: + import nonexistent_module_that_will_never_exist # type: ignore[import-not-found] # noqa: F401 # isort:skip + except ImportError as error: + reportable_error = errors.ReportableErrorImportError(error=error) + + assert ( + reportable_error.reason == "error importing " + "nonexistent_module_that_will_never_exist library" + ) + assert reportable_error.supporting_data["error"] == repr(error) + class TestReadAzureOvf: def test_invalid_xml_raises_non_azure_ds(self): @@ -5630,14 +5649,6 @@ def test_missing_secondary( assert azure_ds.validate_imds_network_metadata(imds_md) is False -class TestDependencyFallback: - def test_dependency_fallback(self): - """Ensure that crypt/passlib import failover gets exercised on all - Python versions - """ - assert dsaz.encrypt_pass("`") - - class TestQueryVmId: @mock.patch.object( identity, "query_system_uuid", side_effect=["test-system-uuid"] @@ -5700,3 +5711,102 @@ def test_query_vm_id_vm_id_conversion_failure( mock_query_system_uuid.assert_called_once() mock_convert_uuid.assert_called_once_with("test-system-uuid") + + +class TestHashPassword: + """Tests for the hash_password function.""" + + def test_dependency_fallback(self): + """Ensure that crypt/passlib import failover gets exercised on all + Python versions + """ + result = dsaz.hash_password("`") + assert result + assert result.startswith("$6$") + + def test_crypt_working(self): + """Test that hash_password uses crypt when available.""" + mock_crypt = mock.MagicMock() + mock_crypt.METHOD_SHA512 = "sha512" + mock_crypt.mksalt.return_value = "$6$saltvalue" + mock_crypt.crypt.return_value = "$6$saltvalue$hashedpassword" + + with mock.patch.dict("sys.modules", {"crypt": mock_crypt}): + result = dsaz.hash_password("testpassword") + + mock_crypt.mksalt.assert_called_once_with("sha512") + mock_crypt.crypt.assert_called_once_with( + "testpassword", "$6$saltvalue" + ) + assert result == "$6$saltvalue$hashedpassword" + + def test_crypt_not_installed_passlib_fallback(self): + """Test that hash_password falls back to passlib when missing crypt.""" + real_import = builtins.__import__ + passlib_available = True + try: + import passlib.hash as _passlib_hash + except ImportError: + passlib_available = False + + if passlib_available: + # passlib is installed; block crypt and let passlib work normally + def mock_import(name, *args, **kwargs): + if name == "crypt": + raise ImportError("No module named 'crypt'") + return real_import(name, *args, **kwargs) + + with mock.patch.object( + builtins, "__import__", side_effect=mock_import + ): + result = dsaz.hash_password("testpassword") + + # Verify we got a valid SHA-512 hash from passlib + assert result.startswith("$6$") + assert _passlib_hash.sha512_crypt.verify("testpassword", result) + else: + # passlib is not installed; mock it to return a known hash + mock_passlib_hash = mock.MagicMock() + mock_passlib_hash.sha512_crypt.hash.return_value = ( + "$6$mocksalt$mockedhash" + ) + + def mock_import(name, *args, **kwargs): + if name == "crypt": + raise ImportError("No module named 'crypt'") + if name == "passlib.hash": + mod = mock.MagicMock() + mod.hash = mock_passlib_hash + sys.modules["passlib"] = mod + sys.modules["passlib.hash"] = mock_passlib_hash + return mod + return real_import(name, *args, **kwargs) + + with mock.patch.object( + builtins, "__import__", side_effect=mock_import + ): + result = dsaz.hash_password("testpassword") + + assert result == "$6$mocksalt$mockedhash" + mock_passlib_hash.sha512_crypt.hash.assert_called_once_with( + "testpassword" + ) + + def test_crypt_and_passlib_unavailable_raises_error(self): + """Test that hash_password raises ReportableErrorImportError.""" + real_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "crypt": + raise ImportError("No module named 'crypt'") + if name == "passlib.hash": + raise ImportError("No module named 'passlib'", name="passlib") + return real_import(name, *args, **kwargs) + + with mock.patch.object( + builtins, "__import__", side_effect=mock_import + ): + with pytest.raises(errors.ReportableErrorImportError) as exc_info: + dsaz.hash_password("testpassword") + + assert "passlib" in exc_info.value.reason