Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 92 additions & 46 deletions cloudinit/net/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import logging
import os
import re
import socket
import struct
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import urlparse

from cloudinit import subp, util
from cloudinit.url_helper import UrlError, readurl
Expand Down Expand Up @@ -1112,12 +1111,16 @@ def has_url_connectivity(url_data: Dict[str, Any]) -> bool:
)
return False
url = url_data["url"]
if not any([url.startswith("http://"), url.startswith("https://")]):
LOG.warning(
"Ignoring connectivity check. Expected URL beginning with http*://"
" received '%s'",
url,
)
try:
result = urlparse(url)
if not any([result.scheme == "http", result.scheme == "https"]):
LOG.warning(
"Ignoring connectivity check. Invalid URL scheme %s",
url.scheme,
)
return False
except ValueError as err:
LOG.warning("Ignoring connectivity check. Invalid URL %s", err)
return False
if "timeout" not in url_data:
url_data["timeout"] = 5
Expand All @@ -1128,69 +1131,118 @@ def has_url_connectivity(url_data: Dict[str, Any]) -> bool:
return True


def is_ip_address(s: str) -> bool:
"""Returns a bool indicating if ``s`` is an IP address.
def network_validator(check_cb: Callable, address: str, **kwargs) -> bool:
"""Use a function to determine whether address meets criteria.

:param s:
:param check_cb:
Test function, must return a truthy value
:param address:
The string to test.

:return:
A bool indicating if the string contains an IP address or not.
A bool indicating if the string passed the test.

"""
try:
ipaddress.ip_address(s)
return bool(check_cb(address, **kwargs))
except ValueError:
return False
return True


def is_ipv4_address(s: str) -> bool:
def is_ip_address(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IP address.

:param address:
The string to test.

:return:
A bool indicating if the string is an IP address or not.
"""
return network_validator(ipaddress.ip_address, address)


def is_ipv4_address(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IPv4 address.

:param s:
:param address:
The string to test.

:return:
A bool indicating if the string contains an IPv4 address or not.
A bool indicating if the string is an IPv4 address or not.
"""
try:
ipaddress.IPv4Address(s)
except ValueError:
return False
return True
return network_validator(ipaddress.IPv4Address, address)


def is_ipv6_addr(address):
if not address:
return False
return ":" in str(address)
def is_ipv6_address(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IPv6 address.

:param address:
The string to test.

:return:
A bool indicating if the string is an IPv4 address or not.
"""
return network_validator(ipaddress.IPv6Address, address)


def is_ip_network(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IPv4 or IPv6 network.

:param address:
The string to test.

:return:
A bool indicating if the string is an IPv4 address or not.
"""
return network_validator(ipaddress.ip_network, address, strict=False)


def is_ipv4_network(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IPv4 network.

:param address:
The string to test.

:return:
A bool indicating if the string is an IPv4 address or not.
"""
return network_validator(ipaddress.IPv4Network, address, strict=False)


def is_ipv6_network(address: str) -> bool:
"""Returns a bool indicating if ``s`` is an IPv6 network.

:param address:
The string to test.

def subnet_is_ipv6(subnet):
:return:
A bool indicating if the string is an IPv4 address or not.
"""
return network_validator(ipaddress.IPv6Network, address, strict=False)


def subnet_is_ipv6(subnet) -> bool:
"""Common helper for checking network_state subnets for ipv6."""
# 'static6', 'dhcp6', 'ipv6_dhcpv6-stateful', 'ipv6_dhcpv6-stateless' or
# 'ipv6_slaac'
if subnet["type"].endswith("6") or subnet["type"] in IPV6_DYNAMIC_TYPES:
# This is a request either static6 type or DHCPv6.
return True
elif subnet["type"] == "static" and is_ipv6_addr(subnet.get("address")):
elif subnet["type"] == "static" and is_ipv6_address(subnet.get("address")):
return True
return False


def net_prefix_to_ipv4_mask(prefix):
def net_prefix_to_ipv4_mask(prefix) -> str:
"""Convert a network prefix to an ipv4 netmask.

This is the inverse of ipv4_mask_to_net_prefix.
24 -> "255.255.255.0"
Also supports input as a string."""
mask = socket.inet_ntoa(
struct.pack(">I", (0xFFFFFFFF << (32 - int(prefix)) & 0xFFFFFFFF))
)
return mask
return str(ipaddress.IPv4Network(f"0.0.0.0/{prefix}").netmask)


def ipv4_mask_to_net_prefix(mask):
def ipv4_mask_to_net_prefix(mask) -> int:
"""Convert an ipv4 netmask into a network prefix length.

If the input is already an integer or a string representation of
Expand All @@ -1202,7 +1254,7 @@ def ipv4_mask_to_net_prefix(mask):
return ipaddress.ip_network(f"0.0.0.0/{mask}").prefixlen


def ipv6_mask_to_net_prefix(mask):
def ipv6_mask_to_net_prefix(mask) -> int:
"""Convert an ipv6 netmask (very uncommon) or prefix (64) to prefix.

If the input is already an integer or a string representation of
Expand Down Expand Up @@ -1237,17 +1289,11 @@ def ipv6_mask_to_net_prefix(mask):
return prefixlen


def mask_and_ipv4_to_bcast_addr(mask, ip):
"""Calculate the broadcast address from the subnet mask and ip addr.

Supports ipv4 only."""
ip_bin = int("".join([bin(int(x) + 256)[3:] for x in ip.split(".")]), 2)
mask_dec = ipv4_mask_to_net_prefix(mask)
bcast_bin = ip_bin | (2 ** (32 - mask_dec) - 1)
bcast_str = ".".join(
[str(bcast_bin >> (i << 3) & 0xFF) for i in range(4)[::-1]]
def mask_and_ipv4_to_bcast_addr(mask: str, ip: str) -> str:
"""Get string representation of broadcast address from an ip/mask pair"""
return str(
ipaddress.IPv4Network(f"{ip}/{mask}", strict=False).broadcast_address
)
return bcast_str


class EphemeralIPv4Network(object):
Expand Down
6 changes: 4 additions & 2 deletions cloudinit/net/dhcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
find_fallback_nic,
get_devicelist,
has_url_connectivity,
mask_and_ipv4_to_bcast_addr,
)
from cloudinit.net import mask_and_ipv4_to_bcast_addr as bcip

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,7 +120,9 @@ def obtain_lease(self):
}
kwargs = self.extract_dhcp_options_mapping(nmap)
if not kwargs["broadcast"]:
kwargs["broadcast"] = bcip(kwargs["prefix_or_mask"], kwargs["ip"])
kwargs["broadcast"] = mask_and_ipv4_to_bcast_addr(
kwargs["prefix_or_mask"], kwargs["ip"]
)
if kwargs["static_routes"]:
kwargs["static_routes"] = parse_static_routes(
kwargs["static_routes"]
Expand Down
4 changes: 2 additions & 2 deletions cloudinit/net/network_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from cloudinit import log as logging
from cloudinit import subp, util
from cloudinit.net import is_ipv6_addr, subnet_is_ipv6
from cloudinit.net import is_ipv6_address, subnet_is_ipv6

from . import renderer

Expand Down Expand Up @@ -137,7 +137,7 @@ def _add_nameserver(self, dns):
# together. We might be getting an IPv6 name server while
# we're dealing with an IPv4 subnet. Sort this out by figuring
# out the correct family and making sure a valid section exist.
family = "ipv6" if is_ipv6_addr(dns) else "ipv4"
family = "ipv6" if is_ipv6_address(dns) else "ipv4"
self._set_default(family, "method", "disabled")

self._set_default(family, "dns", "")
Expand Down
41 changes: 26 additions & 15 deletions cloudinit/net/network_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
get_interfaces_by_mac,
ipv4_mask_to_net_prefix,
ipv6_mask_to_net_prefix,
is_ipv6_addr,
is_ip_network,
is_ipv4_network,
is_ipv6_address,
is_ipv6_network,
net_prefix_to_ipv4_mask,
)

Expand Down Expand Up @@ -361,7 +364,7 @@ def handle_physical(self, command):
# automatically set 'use_ipv6' if any addresses are ipv6
if not self.use_ipv6:
for subnet in subnets:
if subnet.get("type").endswith("6") or is_ipv6_addr(
if subnet.get("type").endswith("6") or is_ipv6_address(
subnet.get("address")
):
self.use_ipv6 = True
Expand Down Expand Up @@ -944,24 +947,32 @@ def _normalize_net_keys(network, address_keys=()):
LOG.error(message)
raise ValueError(message)

addr = net.get(addr_key)
ipv6 = is_ipv6_addr(addr)
addr = str(net.get(addr_key))
if not is_ip_network(addr):
LOG.error("Address %s is not a valid ip network", addr)
raise ValueError(f"Address {addr} is not a valid ip address")

ipv6 = is_ipv6_network(addr)
ipv4 = is_ipv4_network(addr)

netmask = net.get("netmask")
if "/" in addr:
addr_part, _, maybe_prefix = addr.partition("/")
net[addr_key] = addr_part
try:
prefix = int(maybe_prefix)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this try/except was redundant: ipv{4,6}_mask_to_net_prefix() can handle an argument already in prefix form.

except ValueError:
if ipv6:
# this supports input of ffff:ffff:ffff::
prefix = ipv6_mask_to_net_prefix(maybe_prefix)
else:
# this supports input of 255.255.255.0
prefix = ipv4_mask_to_net_prefix(maybe_prefix)
if ipv6:
# this supports input of ffff:ffff:ffff::
prefix = ipv6_mask_to_net_prefix(maybe_prefix)
elif ipv4:
# this supports input of 255.255.255.0
prefix = ipv4_mask_to_net_prefix(maybe_prefix)
else:
# In theory this never happens, is_ip_network() should catch all
# invalid networks
LOG.error("Address %s is not a valid ip network", addr)
raise ValueError(f"Address {addr} is not a valid ip address")
elif "prefix" in net:
prefix = int(net["prefix"])
elif netmask and not ipv6:
elif netmask and ipv4:
prefix = ipv4_mask_to_net_prefix(netmask)
elif netmask and ipv6:
prefix = ipv6_mask_to_net_prefix(netmask)
Expand All @@ -981,7 +992,7 @@ def _normalize_net_keys(network, address_keys=()):
# 'netmask' for ipv6. We need a 'net_prefix_to_ipv6_mask' for that.
if "netmask" in net:
del net["netmask"]
else:
elif ipv4:
net["netmask"] = net_prefix_to_ipv4_mask(net["prefix"])

return net
Expand Down
8 changes: 5 additions & 3 deletions cloudinit/net/sysconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cloudinit.distros.parsers import networkmanager_conf, resolv_conf
from cloudinit.net import (
IPV6_DYNAMIC_TYPES,
is_ipv6_addr,
is_ipv6_address,
net_prefix_to_ipv4_mask,
subnet_is_ipv6,
)
Expand Down Expand Up @@ -589,7 +589,7 @@ def _render_subnets(cls, iface_cfg, subnets, has_default_route, flavor):

if "gateway" in subnet and flavor != "suse":
iface_cfg["DEFROUTE"] = True
if is_ipv6_addr(subnet["gateway"]):
if is_ipv6_address(subnet["gateway"]):
iface_cfg["IPV6_DEFAULTGW"] = subnet["gateway"]
else:
iface_cfg["GATEWAY"] = subnet["gateway"]
Expand Down Expand Up @@ -619,7 +619,9 @@ def _render_subnet_routes(cls, iface_cfg, route_cfg, subnets, flavor):
for _, subnet in enumerate(subnets, start=len(iface_cfg.children)):
subnet_type = subnet.get("type")
for route in subnet.get("routes", []):
is_ipv6 = subnet.get("ipv6") or is_ipv6_addr(route["gateway"])
is_ipv6 = subnet.get("ipv6") or is_ipv6_address(
route["gateway"]
)

# Any dynamic configuration method, slaac, dhcpv6-stateful/
# stateless should get router information from router RA's.
Expand Down
33 changes: 32 additions & 1 deletion tests/unittests/net/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,7 +1848,9 @@ class TestIsIpAddress:
(
(ValueError, False),
(lambda _: ipaddress.IPv4Address("192.168.0.1"), True),
(lambda _: ipaddress.IPv4Address("192.168.0.1/24"), False),
(lambda _: ipaddress.IPv6Address("2001:db8::"), True),
(lambda _: ipaddress.IPv6Address("2001:db8::/48"), False),
),
)
def test_is_ip_address(self, ip_address_side_effect, expected_return):
Expand Down Expand Up @@ -1890,4 +1892,33 @@ def test_is_ip_address(self, ipv4address_mock, expected_return):
assert [expected_call] == m_ipv4address.call_args_list


# vi: ts=4 expandtab
class TestIsIpNetwork:
"""Tests for net.is_ip_network() and related functions."""

@pytest.mark.parametrize(
"func,arg,expected_return",
(
(net.is_ip_network, "192.168.1.1", True),
(net.is_ip_network, "192.168.1.1/24", True),
(net.is_ip_network, "192.168.1.1/32", True),
(net.is_ip_network, "192.168.1.1/33", False),
(net.is_ip_network, "2001:67c:1", False),
(net.is_ip_network, "2001:67c:1/32", False),
(net.is_ip_network, "2001:67c::", True),
(net.is_ip_network, "2001:67c::/32", True),
(net.is_ipv4_network, "192.168.1.1", True),
(net.is_ipv4_network, "192.168.1.1/24", True),
(net.is_ipv4_network, "2001:67c::", False),
(net.is_ipv4_network, "2001:67c::/32", False),
(net.is_ipv6_network, "192.168.1.1", False),
(net.is_ipv6_network, "192.168.1.1/24", False),
(net.is_ipv6_network, "2001:67c:1", False),
(net.is_ipv6_network, "2001:67c:1/32", False),
(net.is_ipv6_network, "2001:67c::", True),
(net.is_ipv6_network, "2001:67c::/32", True),
(net.is_ipv6_network, "2001:67c::/129", False),
(net.is_ipv6_network, "2001:67c::/128", True),
),
)
def test_is_ip_network(self, func, arg, expected_return):
assert func(arg) == expected_return
Loading