diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py index 21ed872dd0a..f02bae2ed0b 100644 --- a/cloudinit/net/__init__.py +++ b/cloudinit/net/__init__.py @@ -11,9 +11,8 @@ import logging import os import re -import socket -import struct -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional +from urllib.parse import urlparse from cloudinit import subp, util from cloudinit.url_helper import UrlError, readurl @@ -1112,12 +1111,16 @@ def has_url_connectivity(url_data: Dict[str, Any]) -> bool: ) return False url = url_data["url"] - if not any([url.startswith("http://"), url.startswith("https://")]): - LOG.warning( - "Ignoring connectivity check. Expected URL beginning with http*://" - " received '%s'", - url, - ) + try: + result = urlparse(url) + if not any([result.scheme == "http", result.scheme == "https"]): + LOG.warning( + "Ignoring connectivity check. Invalid URL scheme %s", + url.scheme, + ) + return False + except ValueError as err: + LOG.warning("Ignoring connectivity check. Invalid URL %s", err) return False if "timeout" not in url_data: url_data["timeout"] = 5 @@ -1128,69 +1131,118 @@ def has_url_connectivity(url_data: Dict[str, Any]) -> bool: return True -def is_ip_address(s: str) -> bool: - """Returns a bool indicating if ``s`` is an IP address. +def network_validator(check_cb: Callable, address: str, **kwargs) -> bool: + """Use a function to determine whether address meets criteria. - :param s: + :param check_cb: + Test function, must return a truthy value + :param address: The string to test. :return: - A bool indicating if the string contains an IP address or not. + A bool indicating if the string passed the test. + """ try: - ipaddress.ip_address(s) + return bool(check_cb(address, **kwargs)) except ValueError: return False - return True -def is_ipv4_address(s: str) -> bool: +def is_ip_address(address: str) -> bool: + """Returns a bool indicating if ``s`` is an IP address. + + :param address: + The string to test. + + :return: + A bool indicating if the string is an IP address or not. + """ + return network_validator(ipaddress.ip_address, address) + + +def is_ipv4_address(address: str) -> bool: """Returns a bool indicating if ``s`` is an IPv4 address. - :param s: + :param address: The string to test. :return: - A bool indicating if the string contains an IPv4 address or not. + A bool indicating if the string is an IPv4 address or not. """ - try: - ipaddress.IPv4Address(s) - except ValueError: - return False - return True + return network_validator(ipaddress.IPv4Address, address) -def is_ipv6_addr(address): - if not address: - return False - return ":" in str(address) +def is_ipv6_address(address: str) -> bool: + """Returns a bool indicating if ``s`` is an IPv6 address. + + :param address: + The string to test. + + :return: + A bool indicating if the string is an IPv4 address or not. + """ + return network_validator(ipaddress.IPv6Address, address) + + +def is_ip_network(address: str) -> bool: + """Returns a bool indicating if ``s`` is an IPv4 or IPv6 network. + + :param address: + The string to test. + + :return: + A bool indicating if the string is an IPv4 address or not. + """ + return network_validator(ipaddress.ip_network, address, strict=False) + + +def is_ipv4_network(address: str) -> bool: + """Returns a bool indicating if ``s`` is an IPv4 network. + + :param address: + The string to test. + :return: + A bool indicating if the string is an IPv4 address or not. + """ + return network_validator(ipaddress.IPv4Network, address, strict=False) + + +def is_ipv6_network(address: str) -> bool: + """Returns a bool indicating if ``s`` is an IPv6 network. + + :param address: + The string to test. -def subnet_is_ipv6(subnet): + :return: + A bool indicating if the string is an IPv4 address or not. + """ + return network_validator(ipaddress.IPv6Network, address, strict=False) + + +def subnet_is_ipv6(subnet) -> bool: """Common helper for checking network_state subnets for ipv6.""" # 'static6', 'dhcp6', 'ipv6_dhcpv6-stateful', 'ipv6_dhcpv6-stateless' or # 'ipv6_slaac' if subnet["type"].endswith("6") or subnet["type"] in IPV6_DYNAMIC_TYPES: # This is a request either static6 type or DHCPv6. return True - elif subnet["type"] == "static" and is_ipv6_addr(subnet.get("address")): + elif subnet["type"] == "static" and is_ipv6_address(subnet.get("address")): return True return False -def net_prefix_to_ipv4_mask(prefix): +def net_prefix_to_ipv4_mask(prefix) -> str: """Convert a network prefix to an ipv4 netmask. This is the inverse of ipv4_mask_to_net_prefix. 24 -> "255.255.255.0" Also supports input as a string.""" - mask = socket.inet_ntoa( - struct.pack(">I", (0xFFFFFFFF << (32 - int(prefix)) & 0xFFFFFFFF)) - ) - return mask + return str(ipaddress.IPv4Network(f"0.0.0.0/{prefix}").netmask) -def ipv4_mask_to_net_prefix(mask): +def ipv4_mask_to_net_prefix(mask) -> int: """Convert an ipv4 netmask into a network prefix length. If the input is already an integer or a string representation of @@ -1202,7 +1254,7 @@ def ipv4_mask_to_net_prefix(mask): return ipaddress.ip_network(f"0.0.0.0/{mask}").prefixlen -def ipv6_mask_to_net_prefix(mask): +def ipv6_mask_to_net_prefix(mask) -> int: """Convert an ipv6 netmask (very uncommon) or prefix (64) to prefix. If the input is already an integer or a string representation of @@ -1237,17 +1289,11 @@ def ipv6_mask_to_net_prefix(mask): return prefixlen -def mask_and_ipv4_to_bcast_addr(mask, ip): - """Calculate the broadcast address from the subnet mask and ip addr. - - Supports ipv4 only.""" - ip_bin = int("".join([bin(int(x) + 256)[3:] for x in ip.split(".")]), 2) - mask_dec = ipv4_mask_to_net_prefix(mask) - bcast_bin = ip_bin | (2 ** (32 - mask_dec) - 1) - bcast_str = ".".join( - [str(bcast_bin >> (i << 3) & 0xFF) for i in range(4)[::-1]] +def mask_and_ipv4_to_bcast_addr(mask: str, ip: str) -> str: + """Get string representation of broadcast address from an ip/mask pair""" + return str( + ipaddress.IPv4Network(f"{ip}/{mask}", strict=False).broadcast_address ) - return bcast_str class EphemeralIPv4Network(object): diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py index beb54957af4..53f8c6864f3 100644 --- a/cloudinit/net/dhcp.py +++ b/cloudinit/net/dhcp.py @@ -20,8 +20,8 @@ find_fallback_nic, get_devicelist, has_url_connectivity, + mask_and_ipv4_to_bcast_addr, ) -from cloudinit.net import mask_and_ipv4_to_bcast_addr as bcip LOG = logging.getLogger(__name__) @@ -120,7 +120,9 @@ def obtain_lease(self): } kwargs = self.extract_dhcp_options_mapping(nmap) if not kwargs["broadcast"]: - kwargs["broadcast"] = bcip(kwargs["prefix_or_mask"], kwargs["ip"]) + kwargs["broadcast"] = mask_and_ipv4_to_bcast_addr( + kwargs["prefix_or_mask"], kwargs["ip"] + ) if kwargs["static_routes"]: kwargs["static_routes"] = parse_static_routes( kwargs["static_routes"] diff --git a/cloudinit/net/network_manager.py b/cloudinit/net/network_manager.py index b108f61fa08..dadd2ec23a0 100644 --- a/cloudinit/net/network_manager.py +++ b/cloudinit/net/network_manager.py @@ -14,7 +14,7 @@ from cloudinit import log as logging from cloudinit import subp, util -from cloudinit.net import is_ipv6_addr, subnet_is_ipv6 +from cloudinit.net import is_ipv6_address, subnet_is_ipv6 from . import renderer @@ -137,7 +137,7 @@ def _add_nameserver(self, dns): # together. We might be getting an IPv6 name server while # we're dealing with an IPv4 subnet. Sort this out by figuring # out the correct family and making sure a valid section exist. - family = "ipv6" if is_ipv6_addr(dns) else "ipv4" + family = "ipv6" if is_ipv6_address(dns) else "ipv4" self._set_default(family, "method", "disabled") self._set_default(family, "dns", "") diff --git a/cloudinit/net/network_state.py b/cloudinit/net/network_state.py index 98b1e5db5b1..4e49a41a01d 100644 --- a/cloudinit/net/network_state.py +++ b/cloudinit/net/network_state.py @@ -13,7 +13,10 @@ get_interfaces_by_mac, ipv4_mask_to_net_prefix, ipv6_mask_to_net_prefix, - is_ipv6_addr, + is_ip_network, + is_ipv4_network, + is_ipv6_address, + is_ipv6_network, net_prefix_to_ipv4_mask, ) @@ -361,7 +364,7 @@ def handle_physical(self, command): # automatically set 'use_ipv6' if any addresses are ipv6 if not self.use_ipv6: for subnet in subnets: - if subnet.get("type").endswith("6") or is_ipv6_addr( + if subnet.get("type").endswith("6") or is_ipv6_address( subnet.get("address") ): self.use_ipv6 = True @@ -944,24 +947,32 @@ def _normalize_net_keys(network, address_keys=()): LOG.error(message) raise ValueError(message) - addr = net.get(addr_key) - ipv6 = is_ipv6_addr(addr) + addr = str(net.get(addr_key)) + if not is_ip_network(addr): + LOG.error("Address %s is not a valid ip network", addr) + raise ValueError(f"Address {addr} is not a valid ip address") + + ipv6 = is_ipv6_network(addr) + ipv4 = is_ipv4_network(addr) + netmask = net.get("netmask") if "/" in addr: addr_part, _, maybe_prefix = addr.partition("/") net[addr_key] = addr_part - try: - prefix = int(maybe_prefix) - except ValueError: - if ipv6: - # this supports input of ffff:ffff:ffff:: - prefix = ipv6_mask_to_net_prefix(maybe_prefix) - else: - # this supports input of 255.255.255.0 - prefix = ipv4_mask_to_net_prefix(maybe_prefix) + if ipv6: + # this supports input of ffff:ffff:ffff:: + prefix = ipv6_mask_to_net_prefix(maybe_prefix) + elif ipv4: + # this supports input of 255.255.255.0 + prefix = ipv4_mask_to_net_prefix(maybe_prefix) + else: + # In theory this never happens, is_ip_network() should catch all + # invalid networks + LOG.error("Address %s is not a valid ip network", addr) + raise ValueError(f"Address {addr} is not a valid ip address") elif "prefix" in net: prefix = int(net["prefix"]) - elif netmask and not ipv6: + elif netmask and ipv4: prefix = ipv4_mask_to_net_prefix(netmask) elif netmask and ipv6: prefix = ipv6_mask_to_net_prefix(netmask) @@ -981,7 +992,7 @@ def _normalize_net_keys(network, address_keys=()): # 'netmask' for ipv6. We need a 'net_prefix_to_ipv6_mask' for that. if "netmask" in net: del net["netmask"] - else: + elif ipv4: net["netmask"] = net_prefix_to_ipv4_mask(net["prefix"]) return net diff --git a/cloudinit/net/sysconfig.py b/cloudinit/net/sysconfig.py index 945218edfd1..6d769f12ada 100644 --- a/cloudinit/net/sysconfig.py +++ b/cloudinit/net/sysconfig.py @@ -11,7 +11,7 @@ from cloudinit.distros.parsers import networkmanager_conf, resolv_conf from cloudinit.net import ( IPV6_DYNAMIC_TYPES, - is_ipv6_addr, + is_ipv6_address, net_prefix_to_ipv4_mask, subnet_is_ipv6, ) @@ -589,7 +589,7 @@ def _render_subnets(cls, iface_cfg, subnets, has_default_route, flavor): if "gateway" in subnet and flavor != "suse": iface_cfg["DEFROUTE"] = True - if is_ipv6_addr(subnet["gateway"]): + if is_ipv6_address(subnet["gateway"]): iface_cfg["IPV6_DEFAULTGW"] = subnet["gateway"] else: iface_cfg["GATEWAY"] = subnet["gateway"] @@ -619,7 +619,9 @@ def _render_subnet_routes(cls, iface_cfg, route_cfg, subnets, flavor): for _, subnet in enumerate(subnets, start=len(iface_cfg.children)): subnet_type = subnet.get("type") for route in subnet.get("routes", []): - is_ipv6 = subnet.get("ipv6") or is_ipv6_addr(route["gateway"]) + is_ipv6 = subnet.get("ipv6") or is_ipv6_address( + route["gateway"] + ) # Any dynamic configuration method, slaac, dhcpv6-stateful/ # stateless should get router information from router RA's. diff --git a/tests/unittests/net/test_init.py b/tests/unittests/net/test_init.py index f3ad5292b85..d03186e9da7 100644 --- a/tests/unittests/net/test_init.py +++ b/tests/unittests/net/test_init.py @@ -1848,7 +1848,9 @@ class TestIsIpAddress: ( (ValueError, False), (lambda _: ipaddress.IPv4Address("192.168.0.1"), True), + (lambda _: ipaddress.IPv4Address("192.168.0.1/24"), False), (lambda _: ipaddress.IPv6Address("2001:db8::"), True), + (lambda _: ipaddress.IPv6Address("2001:db8::/48"), False), ), ) def test_is_ip_address(self, ip_address_side_effect, expected_return): @@ -1890,4 +1892,33 @@ def test_is_ip_address(self, ipv4address_mock, expected_return): assert [expected_call] == m_ipv4address.call_args_list -# vi: ts=4 expandtab +class TestIsIpNetwork: + """Tests for net.is_ip_network() and related functions.""" + + @pytest.mark.parametrize( + "func,arg,expected_return", + ( + (net.is_ip_network, "192.168.1.1", True), + (net.is_ip_network, "192.168.1.1/24", True), + (net.is_ip_network, "192.168.1.1/32", True), + (net.is_ip_network, "192.168.1.1/33", False), + (net.is_ip_network, "2001:67c:1", False), + (net.is_ip_network, "2001:67c:1/32", False), + (net.is_ip_network, "2001:67c::", True), + (net.is_ip_network, "2001:67c::/32", True), + (net.is_ipv4_network, "192.168.1.1", True), + (net.is_ipv4_network, "192.168.1.1/24", True), + (net.is_ipv4_network, "2001:67c::", False), + (net.is_ipv4_network, "2001:67c::/32", False), + (net.is_ipv6_network, "192.168.1.1", False), + (net.is_ipv6_network, "192.168.1.1/24", False), + (net.is_ipv6_network, "2001:67c:1", False), + (net.is_ipv6_network, "2001:67c:1/32", False), + (net.is_ipv6_network, "2001:67c::", True), + (net.is_ipv6_network, "2001:67c::/32", True), + (net.is_ipv6_network, "2001:67c::/129", False), + (net.is_ipv6_network, "2001:67c::/128", True), + ), + ) + def test_is_ip_network(self, func, arg, expected_return): + assert func(arg) == expected_return diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index c5105898511..ecf33070906 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -2951,10 +2951,10 @@ address: 2001:1::1/92 routes: - gateway: 2001:67c:1562::1 - network: 2001:67c:1 + network: "2001:67c::" netmask: "ffff:ffff::" - gateway: 3001:67c:15::1 - network: 3001:67c:1 + network: "3001:67c::" netmask: "ffff:ffff::" metric: 10000 """ @@ -2997,10 +2997,10 @@ routes: - to: 10.1.3.0/24 via: 192.168.0.3 - - to: 2001:67c:1/32 + - to: 2001:67c::/32 via: 2001:67c:1562::1 - metric: 10000 - to: 3001:67c:1/32 + to: 3001:67c::/32 via: 3001:67c:15::1 """ ), @@ -3061,11 +3061,11 @@ # control-alias bond0 iface bond0 inet6 static address 2001:1::1/92 - post-up route add -A inet6 2001:67c:1/32 gw 2001:67c:1562::1 || true - pre-down route del -A inet6 2001:67c:1/32 gw 2001:67c:1562::1 || true - post-up route add -A inet6 3001:67c:1/32 gw 3001:67c:15::1 metric 10000 \ + post-up route add -A inet6 2001:67c::/32 gw 2001:67c:1562::1 || true + pre-down route del -A inet6 2001:67c::/32 gw 2001:67c:1562::1 || true + post-up route add -A inet6 3001:67c::/32 gw 3001:67c:15::1 metric 10000 \ || true - pre-down route del -A inet6 3001:67c:1/32 gw 3001:67c:15::1 metric 10000 \ + pre-down route del -A inet6 3001:67c::/32 gw 3001:67c:15::1 metric 10000 \ || true """ ), @@ -3243,8 +3243,8 @@ """\ # Created by cloud-init on instance boot automatically, do not edit. # - 2001:67c:1/32 via 2001:67c:1562::1 dev bond0 - 3001:67c:1/32 via 3001:67c:15::1 metric 10000 dev bond0 + 2001:67c::/32 via 2001:67c:1562::1 dev bond0 + 3001:67c::/32 via 3001:67c:15::1 metric 10000 dev bond0 """ ), "route-bond0": textwrap.dedent( @@ -3344,8 +3344,8 @@ may-fail=false addr-gen-mode=stable-privacy address1=2001:1::1/92 - route1=2001:67c:1/32,2001:67c:1562::1 - route2=3001:67c:1/32,3001:67c:15::1 + route1=2001:67c::/32,2001:67c:1562::1 + route2=3001:67c::/32,3001:67c:15::1 """ ),