diff --git a/cloudinit/util.py b/cloudinit/util.py index cac5926f10f..fa258456b81 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -1300,6 +1300,10 @@ def is_resolvable(url) -> bool: with suppress(ValueError): if net.is_ip_address(parsed_url.netloc.strip("[]")): return True + try: + hostname_result = socket.getaddrinfo(name, None) + except (socket.gaierror, socket.error): + return False if _DNS_REDIRECT_IP is None: badips = set() @@ -1324,13 +1328,11 @@ def is_resolvable(url) -> bool: if badresults: LOG.debug("detected dns redirection: %s", badresults) - try: - result = socket.getaddrinfo(name, None) - # check first result's sockaddr field - addr = result[0][4][0] - return addr not in _DNS_REDIRECT_IP - except (socket.gaierror, socket.error): + # check first result's sockaddr field + addr = hostname_result[0][4][0] + if addr in _DNS_REDIRECT_IP: return False + return True def get_hostname(): diff --git a/tests/unittests/config/test_apt_source_v3.py b/tests/unittests/config/test_apt_source_v3.py index 2e766d77ed5..b7c76ded350 100644 --- a/tests/unittests/config/test_apt_source_v3.py +++ b/tests/unittests/config/test_apt_source_v3.py @@ -927,20 +927,32 @@ def test_apt_v3_url_resolvable(self): # former tests can leave this set (or not if the test is ran directly) # do a hard reset to ensure a stable result util._DNS_REDIRECT_IP = None + badnames = ( + "does-not-exist.example.com.", + "example.invalid.", + "__cloud_init_expected_not_found__", + ) bad = [(None, None, None, "badname", ["10.3.2.1"])] good = [(None, None, None, "goodname", ["10.2.3.4"])] with mock.patch.object( - socket, "getaddrinfo", side_effect=[bad, bad, bad, good, good] + socket, "getaddrinfo", side_effect=[good, bad, bad, bad] ) as mocksock: ret = util.is_resolvable_url("http://us.archive.ubuntu.com/ubuntu") - ret2 = util.is_resolvable_url("http://1.2.3.4/ubuntu") - mocksock.assert_any_call( - "does-not-exist.example.com.", None, 0, 0, 1, 2 - ) - mocksock.assert_any_call("example.invalid.", None, 0, 0, 1, 2) + for badname in badnames: + mocksock.assert_any_call(badname, None, 0, 0, 1, 2) mocksock.assert_any_call("us.archive.ubuntu.com", None) - assert ret is True + + # IP addresses skip DNS checks entirely + with mock.patch.object(socket, "getaddrinfo") as mocksock: + ret2 = util.is_resolvable_url("http://1.2.3.4/ubuntu") + mocksock.assert_not_called() + # Verify badnames were NOT checked for IP addresses + for badname in badnames: + assert ( + mock.call(badname, None, 0, 0, 1, 2) + not in mocksock.call_args_list + ) assert ret2 is True # side effect need only bad ret after initial call @@ -952,6 +964,15 @@ def test_apt_v3_url_resolvable(self): mocksock.assert_has_calls(calls) assert ret3 is False + # Test unresolvable hostname + with mock.patch.object( + socket, "getaddrinfo", side_effect=[bad] + ) as mocksock: + ret4 = util.is_resolvable_url("http://instance.:3336") + calls = [call("instance.", None)] + mocksock.assert_has_calls(calls) + assert ret4 is False + def test_apt_v3_disable_suites(self): """test_disable_suites - disable_suites with many configurations""" release = "xenial" diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 90206489987..789897504ab 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -3173,6 +3173,12 @@ def test_from_str(self, str_ver, cls_ver): @pytest.mark.allow_dns_lookup class TestResolvable: + @pytest.fixture(autouse=True) + def reset_dns_redirect_ip(self): + util._DNS_REDIRECT_IP = None + yield # Test runs here + util._DNS_REDIRECT_IP = None + @mock.patch.object(util, "_DNS_REDIRECT_IP", return_value=True) @mock.patch.object(util.socket, "getaddrinfo") def test_ips_need_not_be_resolved(self, m_getaddr, m_dns): @@ -3185,6 +3191,40 @@ def test_ips_need_not_be_resolved(self, m_getaddr, m_dns): assert util.is_resolvable("http://[fd00:ec2::254]/") is True assert not m_getaddr.called + @mock.patch.object(util.net, "is_ip_address") + @mock.patch.object(util.socket, "getaddrinfo") + def test_hostnames_require_dns_resolution(self, m_getaddr, m_is_ip): + """Hostnames should go through DNS resolution.""" + m_is_ip.return_value = False + + def mock_getaddrinfo(host, port, *args, **kwargs): + badnames = ( + "does-not-exist.example.com.", + "example.invalid.", + "__cloud_init_expected_not_found__", + ) + if host in badnames: + return [(None, None, None, "badname", ("192.0.2.1", 0))] + return [(None, None, None, "example.com", ("10.2.3.4", 0))] + + m_getaddr.side_effect = mock_getaddrinfo + + assert util.is_resolvable("http://example.com/") is True + assert m_getaddr.called + + assert m_getaddr.call_args_list[0] == mock.call("example.com", None) + + badnames = ( + "does-not-exist.example.com.", + "example.invalid.", + "__cloud_init_expected_not_found__", + ) + called_hosts = [call[0][0] for call in m_getaddr.call_args_list[1:]] + for badname in badnames: + assert ( + badname in called_hosts + ), f"Expected badname {badname} to be checked" + class TestMaybeB64Decode: """Test the maybe_b64decode helper function."""