From 245980d6b2d4d948b80fd9e967ad1413b1b4e9e9 Mon Sep 17 00:00:00 2001 From: akutz Date: Tue, 15 Mar 2022 13:23:40 -0500 Subject: [PATCH] Refactoring helper funcs out of NetworkState This patch refactors several network, helper functions out of the network_state.py file and into cloudinit.net. This is in relation to LP #1855945 and github.com/canonical/cloud-init/pull/1327. The aforementioned issue requires some refactoring to avoid circular imports, but it was not proper to do that refactoring in *that* pull request. Thus this PR is specific to the refactoring and can be evaluated on its own merits. --- cloudinit/distros/net_util.py | 5 +- cloudinit/net/__init__.py | 99 ++++++++++++++++++++++++++++- cloudinit/net/dhcp.py | 2 +- cloudinit/net/eni.py | 2 +- cloudinit/net/netplan.py | 12 ++-- cloudinit/net/network_manager.py | 2 +- cloudinit/net/network_state.py | 105 ++----------------------------- cloudinit/net/sysconfig.py | 12 ++-- tests/unittests/test_net.py | 3 +- 9 files changed, 121 insertions(+), 121 deletions(-) diff --git a/cloudinit/distros/net_util.py b/cloudinit/distros/net_util.py index e37fb19b89a..8242c8f2bcf 100644 --- a/cloudinit/distros/net_util.py +++ b/cloudinit/distros/net_util.py @@ -67,10 +67,7 @@ # } # } -from cloudinit.net.network_state import ( - mask_and_ipv4_to_bcast_addr, - net_prefix_to_ipv4_mask, -) +from cloudinit.net import mask_and_ipv4_to_bcast_addr, net_prefix_to_ipv4_mask def translate_network(settings): diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py index dad7b364705..21ed872dd0a 100644 --- a/cloudinit/net/__init__.py +++ b/cloudinit/net/__init__.py @@ -11,15 +11,22 @@ import logging import os import re +import socket +import struct from typing import Any, Dict, List, Optional from cloudinit import subp, util -from cloudinit.net.network_state import ipv4_mask_to_net_prefix from cloudinit.url_helper import UrlError, readurl LOG = logging.getLogger(__name__) SYS_CLASS_NET = "/sys/class/net/" DEFAULT_PRIMARY_INTERFACE = "eth0" +IPV6_DYNAMIC_TYPES = [ + "dhcp6", + "ipv6_slaac", + "ipv6_dhcpv6-stateless", + "ipv6_dhcpv6-stateful", +] OVS_INTERNAL_INTERFACE_LOOKUP_CMD = [ "ovs-vsctl", "--format", @@ -1153,6 +1160,96 @@ def is_ipv4_address(s: str) -> bool: return True +def is_ipv6_addr(address): + if not address: + return False + return ":" in str(address) + + +def subnet_is_ipv6(subnet): + """Common helper for checking network_state subnets for ipv6.""" + # 'static6', 'dhcp6', 'ipv6_dhcpv6-stateful', 'ipv6_dhcpv6-stateless' or + # 'ipv6_slaac' + if subnet["type"].endswith("6") or subnet["type"] in IPV6_DYNAMIC_TYPES: + # This is a request either static6 type or DHCPv6. + return True + elif subnet["type"] == "static" and is_ipv6_addr(subnet.get("address")): + return True + return False + + +def net_prefix_to_ipv4_mask(prefix): + """Convert a network prefix to an ipv4 netmask. + + This is the inverse of ipv4_mask_to_net_prefix. + 24 -> "255.255.255.0" + Also supports input as a string.""" + mask = socket.inet_ntoa( + struct.pack(">I", (0xFFFFFFFF << (32 - int(prefix)) & 0xFFFFFFFF)) + ) + return mask + + +def ipv4_mask_to_net_prefix(mask): + """Convert an ipv4 netmask into a network prefix length. + + If the input is already an integer or a string representation of + an integer, then int(mask) will be returned. + "255.255.255.0" => 24 + str(24) => 24 + "24" => 24 + """ + return ipaddress.ip_network(f"0.0.0.0/{mask}").prefixlen + + +def ipv6_mask_to_net_prefix(mask): + """Convert an ipv6 netmask (very uncommon) or prefix (64) to prefix. + + If the input is already an integer or a string representation of + an integer, then int(mask) will be returned. + "ffff:ffff:ffff::" => 48 + "48" => 48 + """ + try: + # In the case the mask is already a prefix + prefixlen = ipaddress.ip_network(f"::/{mask}").prefixlen + return prefixlen + except ValueError: + # ValueError means mask is an IPv6 address representation and need + # conversion. + pass + + netmask = ipaddress.ip_address(mask) + mask_int = int(netmask) + # If the mask is all zeroes, just return it + if mask_int == 0: + return mask_int + + trailing_zeroes = min( + ipaddress.IPV6LENGTH, (~mask_int & (mask_int - 1)).bit_length() + ) + leading_ones = mask_int >> trailing_zeroes + prefixlen = ipaddress.IPV6LENGTH - trailing_zeroes + all_ones = (1 << prefixlen) - 1 + if leading_ones != all_ones: + raise ValueError("Invalid network mask '%s'" % mask) + + return prefixlen + + +def mask_and_ipv4_to_bcast_addr(mask, ip): + """Calculate the broadcast address from the subnet mask and ip addr. + + Supports ipv4 only.""" + ip_bin = int("".join([bin(int(x) + 256)[3:] for x in ip.split(".")]), 2) + mask_dec = ipv4_mask_to_net_prefix(mask) + bcast_bin = ip_bin | (2 ** (32 - mask_dec) - 1) + bcast_str = ".".join( + [str(bcast_bin >> (i << 3) & 0xFF) for i in range(4)[::-1]] + ) + return bcast_str + + class EphemeralIPv4Network(object): """Context manager which sets up temporary static network configuration. diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py index cf7517a7cde..beb54957af4 100644 --- a/cloudinit/net/dhcp.py +++ b/cloudinit/net/dhcp.py @@ -21,7 +21,7 @@ get_devicelist, has_url_connectivity, ) -from cloudinit.net.network_state import mask_and_ipv4_to_bcast_addr as bcip +from cloudinit.net import mask_and_ipv4_to_bcast_addr as bcip LOG = logging.getLogger(__name__) diff --git a/cloudinit/net/eni.py b/cloudinit/net/eni.py index 99e3fbb08fb..b0ec67bd5e2 100644 --- a/cloudinit/net/eni.py +++ b/cloudinit/net/eni.py @@ -7,9 +7,9 @@ from cloudinit import log as logging from cloudinit import subp, util +from cloudinit.net import subnet_is_ipv6 from . import ParserError, renderer -from .network_state import subnet_is_ipv6 LOG = logging.getLogger(__name__) diff --git a/cloudinit/net/netplan.py b/cloudinit/net/netplan.py index 57ba2d9a429..2af0ee9ba88 100644 --- a/cloudinit/net/netplan.py +++ b/cloudinit/net/netplan.py @@ -5,16 +5,16 @@ from cloudinit import log as logging from cloudinit import safeyaml, subp, util -from cloudinit.net import SYS_CLASS_NET, get_devicelist - -from . import renderer -from .network_state import ( +from cloudinit.net import ( IPV6_DYNAMIC_TYPES, - NET_CONFIG_TO_V2, - NetworkState, + SYS_CLASS_NET, + get_devicelist, subnet_is_ipv6, ) +from . import renderer +from .network_state import NET_CONFIG_TO_V2, NetworkState + KNOWN_SNAPD_CONFIG = b"""\ # This is the initial network config. # It can be overwritten by cloud-init or console-conf. diff --git a/cloudinit/net/network_manager.py b/cloudinit/net/network_manager.py index 79b0fe0bf6a..b108f61fa08 100644 --- a/cloudinit/net/network_manager.py +++ b/cloudinit/net/network_manager.py @@ -14,9 +14,9 @@ from cloudinit import log as logging from cloudinit import subp, util +from cloudinit.net import is_ipv6_addr, subnet_is_ipv6 from . import renderer -from .network_state import is_ipv6_addr, subnet_is_ipv6 NM_RUN_DIR = "/etc/NetworkManager" NM_LIB_DIR = "/usr/lib/NetworkManager" diff --git a/cloudinit/net/network_state.py b/cloudinit/net/network_state.py index 7bac8adfbd4..880ca462b78 100644 --- a/cloudinit/net/network_state.py +++ b/cloudinit/net/network_state.py @@ -6,22 +6,19 @@ import copy import functools -import ipaddress import logging -import socket -import struct from cloudinit import safeyaml, util +from cloudinit.net import ( + ipv4_mask_to_net_prefix, + ipv6_mask_to_net_prefix, + is_ipv6_addr, + net_prefix_to_ipv4_mask, +) LOG = logging.getLogger(__name__) NETWORK_STATE_VERSION = 1 -IPV6_DYNAMIC_TYPES = [ - "dhcp6", - "ipv6_slaac", - "ipv6_dhcpv6-stateless", - "ipv6_dhcpv6-stateful", -] NETWORK_STATE_REQUIRED_KEYS = { 1: ["version", "config", "network_state"], } @@ -1003,96 +1000,6 @@ def _normalize_subnets(subnets): return [_normalize_subnet(s) for s in subnets] -def is_ipv6_addr(address): - if not address: - return False - return ":" in str(address) - - -def subnet_is_ipv6(subnet): - """Common helper for checking network_state subnets for ipv6.""" - # 'static6', 'dhcp6', 'ipv6_dhcpv6-stateful', 'ipv6_dhcpv6-stateless' or - # 'ipv6_slaac' - if subnet["type"].endswith("6") or subnet["type"] in IPV6_DYNAMIC_TYPES: - # This is a request either static6 type or DHCPv6. - return True - elif subnet["type"] == "static" and is_ipv6_addr(subnet.get("address")): - return True - return False - - -def net_prefix_to_ipv4_mask(prefix): - """Convert a network prefix to an ipv4 netmask. - - This is the inverse of ipv4_mask_to_net_prefix. - 24 -> "255.255.255.0" - Also supports input as a string.""" - mask = socket.inet_ntoa( - struct.pack(">I", (0xFFFFFFFF << (32 - int(prefix)) & 0xFFFFFFFF)) - ) - return mask - - -def ipv4_mask_to_net_prefix(mask): - """Convert an ipv4 netmask into a network prefix length. - - If the input is already an integer or a string representation of - an integer, then int(mask) will be returned. - "255.255.255.0" => 24 - str(24) => 24 - "24" => 24 - """ - return ipaddress.ip_network(f"0.0.0.0/{mask}").prefixlen - - -def ipv6_mask_to_net_prefix(mask): - """Convert an ipv6 netmask (very uncommon) or prefix (64) to prefix. - - If the input is already an integer or a string representation of - an integer, then int(mask) will be returned. - "ffff:ffff:ffff::" => 48 - "48" => 48 - """ - try: - # In the case the mask is already a prefix - prefixlen = ipaddress.ip_network(f"::/{mask}").prefixlen - return prefixlen - except ValueError: - # ValueError means mask is an IPv6 address representation and need - # conversion. - pass - - netmask = ipaddress.ip_address(mask) - mask_int = int(netmask) - # If the mask is all zeroes, just return it - if mask_int == 0: - return mask_int - - trailing_zeroes = min( - ipaddress.IPV6LENGTH, (~mask_int & (mask_int - 1)).bit_length() - ) - leading_ones = mask_int >> trailing_zeroes - prefixlen = ipaddress.IPV6LENGTH - trailing_zeroes - all_ones = (1 << prefixlen) - 1 - if leading_ones != all_ones: - raise ValueError("Invalid network mask '%s'" % mask) - - return prefixlen - - -def mask_and_ipv4_to_bcast_addr(mask, ip): - """Calculate the broadcast address from the subnet mask and ip addr. - - Supports ipv4 only.""" - ip_bin = int("".join([bin(int(x) + 256)[3:] for x in ip.split(".")]), 2) - mask_dec = ipv4_mask_to_net_prefix(mask) - bcast_bin = ip_bin | (2 ** (32 - mask_dec) - 1) - bcast_str = ".".join( - [str(bcast_bin >> (i << 3) & 0xFF) for i in range(4)[::-1]] - ) - return bcast_str - - def parse_net_config_data(net_config, skip_broken=True) -> NetworkState: """Parses the config, returns NetworkState object diff --git a/cloudinit/net/sysconfig.py b/cloudinit/net/sysconfig.py index d866c9aa791..883692f05ab 100644 --- a/cloudinit/net/sysconfig.py +++ b/cloudinit/net/sysconfig.py @@ -8,16 +8,16 @@ from cloudinit import log as logging from cloudinit import subp, util from cloudinit.distros.parsers import networkmanager_conf, resolv_conf -from cloudinit.net import network_state - -from . import renderer -from .network_state import ( +from cloudinit.net import ( IPV6_DYNAMIC_TYPES, + ipv6_mask_to_net_prefix, is_ipv6_addr, net_prefix_to_ipv4_mask, subnet_is_ipv6, ) +from . import renderer + LOG = logging.getLogger(__name__) KNOWN_DISTROS = [ "almalinux", @@ -208,9 +208,7 @@ def to_string(self, proto="ipv4"): % ("METRIC" + str(reindex), _quote_value(metric_value)) ) elif proto == "ipv6" and self.is_ipv6_route(address_value): - prefix_value = network_state.ipv6_mask_to_net_prefix( - netmask_value - ) + prefix_value = ipv6_mask_to_net_prefix(netmask_value) metric_value = ( "metric " + str(self._conf["METRIC" + index]) if "METRIC" + index in self._conf diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 9552ac12f7a..9f73c5c61b9 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -19,6 +19,7 @@ cmdline, eni, interface_has_own_mac, + mask_and_ipv4_to_bcast_addr, natural_sort_key, netplan, network_manager, @@ -8076,7 +8077,7 @@ def test_rename_macs_case_insensitive(self, mock_subp): class TestNetworkState(CiTestCase): def test_bcast_addr(self): """Test mask_and_ipv4_to_bcast_addr proper execution.""" - bcast_addr = network_state.mask_and_ipv4_to_bcast_addr + bcast_addr = mask_and_ipv4_to_bcast_addr self.assertEqual( "192.168.1.255", bcast_addr("255.255.255.0", "192.168.1.1") )