summaryrefslogtreecommitdiff
path: root/cloudinit/net
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/net')
-rw-r--r--cloudinit/net/__init__.py192
-rw-r--r--cloudinit/net/bsd.py167
-rwxr-xr-xcloudinit/net/cmdline.py52
-rw-r--r--cloudinit/net/dhcp.py50
-rw-r--r--cloudinit/net/eni.py15
-rw-r--r--cloudinit/net/freebsd.py176
-rw-r--r--cloudinit/net/netbsd.py44
-rw-r--r--cloudinit/net/netplan.py17
-rw-r--r--cloudinit/net/network_state.py32
-rw-r--r--cloudinit/net/openbsd.py46
-rw-r--r--cloudinit/net/renderers.py7
-rw-r--r--cloudinit/net/sysconfig.py21
-rw-r--r--cloudinit/net/tests/test_dhcp.py108
-rw-r--r--cloudinit/net/tests/test_init.py172
-rw-r--r--cloudinit/net/tests/test_network_state.py10
15 files changed, 661 insertions, 448 deletions
diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py
index 1d5eb535..e233149a 100644
--- a/cloudinit/net/__init__.py
+++ b/cloudinit/net/__init__.py
@@ -6,13 +6,14 @@
# This file is part of cloud-init. See LICENSE file for license information.
import errno
+import ipaddress
import logging
import os
import re
-from functools import partial
-from cloudinit.net.network_state import mask_to_net_prefix
+from cloudinit import subp
from cloudinit import util
+from cloudinit.net.network_state import mask_to_net_prefix
from cloudinit.url_helper import UrlError, readurl
LOG = logging.getLogger(__name__)
@@ -97,10 +98,6 @@ def is_up(devname):
return read_sys_net_safe(devname, "operstate", translate=translate)
-def is_wireless(devname):
- return os.path.exists(sys_dev_path(devname, "wireless"))
-
-
def is_bridge(devname):
return os.path.exists(sys_dev_path(devname, "bridge"))
@@ -264,28 +261,6 @@ def is_vlan(devname):
return 'DEVTYPE=vlan' in uevent.splitlines()
-def is_connected(devname):
- # is_connected isn't really as simple as that. 2 is
- # 'physically connected'. 3 is 'not connected'. but a wlan interface will
- # always show 3.
- iflink = read_sys_net_safe(devname, "iflink")
- if iflink == "2":
- return True
- if not is_wireless(devname):
- return False
- LOG.debug("'%s' is wireless, basing 'connected' on carrier", devname)
- return read_sys_net_safe(devname, "carrier",
- translate={'0': False, '1': True})
-
-
-def is_physical(devname):
- return os.path.exists(sys_dev_path(devname, "device"))
-
-
-def is_present(devname):
- return os.path.exists(sys_dev_path(devname))
-
-
def device_driver(devname):
"""Return the device driver for net device named 'devname'."""
driver = None
@@ -334,10 +309,20 @@ def find_fallback_nic(blacklist_drivers=None):
"""Return the name of the 'fallback' network device."""
if util.is_FreeBSD():
return find_fallback_nic_on_freebsd(blacklist_drivers)
+ elif util.is_NetBSD() or util.is_OpenBSD():
+ return find_fallback_nic_on_netbsd_or_openbsd(blacklist_drivers)
else:
return find_fallback_nic_on_linux(blacklist_drivers)
+def find_fallback_nic_on_netbsd_or_openbsd(blacklist_drivers=None):
+ values = list(sorted(
+ get_interfaces_by_mac().values(),
+ key=natural_sort_key))
+ if values:
+ return values[0]
+
+
def find_fallback_nic_on_freebsd(blacklist_drivers=None):
"""Return the name of the 'fallback' network device on FreeBSD.
@@ -347,7 +332,7 @@ def find_fallback_nic_on_freebsd(blacklist_drivers=None):
we'll use the first interface from ``ifconfig -l -u ether``
"""
- stdout, _stderr = util.subp(['ifconfig', '-l', '-u', 'ether'])
+ stdout, _stderr = subp.subp(['ifconfig', '-l', '-u', 'ether'])
values = stdout.split()
if values:
return values[0]
@@ -508,43 +493,6 @@ def extract_physdevs(netcfg):
raise RuntimeError('Unknown network config version: %s' % version)
-def wait_for_physdevs(netcfg, strict=True):
- physdevs = extract_physdevs(netcfg)
-
- # set of expected iface names and mac addrs
- expected_ifaces = dict([(iface[0], iface[1]) for iface in physdevs])
- expected_macs = set(expected_ifaces.keys())
-
- # set of current macs
- present_macs = get_interfaces_by_mac().keys()
-
- # compare the set of expected mac address values to
- # the current macs present; we only check MAC as cloud-init
- # has not yet renamed interfaces and the netcfg may include
- # such renames.
- for _ in range(0, 5):
- if expected_macs.issubset(present_macs):
- LOG.debug('net: all expected physical devices present')
- return
-
- missing = expected_macs.difference(present_macs)
- LOG.debug('net: waiting for expected net devices: %s', missing)
- for mac in missing:
- # trigger a settle, unless this interface exists
- syspath = sys_dev_path(expected_ifaces[mac])
- settle = partial(util.udevadm_settle, exists=syspath)
- msg = 'Waiting for udev events to settle or %s exists' % syspath
- util.log_time(LOG.debug, msg, func=settle)
-
- # update present_macs after settles
- present_macs = get_interfaces_by_mac().keys()
-
- msg = 'Not all expected physical devices present: %s' % missing
- LOG.warning(msg)
- if strict:
- raise RuntimeError(msg)
-
-
def apply_network_config_names(netcfg, strict_present=True, strict_busy=True):
"""read the network config and rename devices accordingly.
if strict_present is false, then do not raise exception if no devices
@@ -558,7 +506,9 @@ def apply_network_config_names(netcfg, strict_present=True, strict_busy=True):
try:
_rename_interfaces(extract_physdevs(netcfg))
except RuntimeError as e:
- raise RuntimeError('Failed to apply network config names: %s' % e)
+ raise RuntimeError(
+ 'Failed to apply network config names: %s' % e
+ ) from e
def interface_has_own_mac(ifname, strict=False):
@@ -609,9 +559,9 @@ def _get_current_rename_info(check_downable=True):
if check_downable:
nmatch = re.compile(r"[0-9]+:\s+(\w+)[@:]")
- ipv6, _err = util.subp(['ip', '-6', 'addr', 'show', 'permanent',
+ ipv6, _err = subp.subp(['ip', '-6', 'addr', 'show', 'permanent',
'scope', 'global'], capture=True)
- ipv4, _err = util.subp(['ip', '-4', 'addr', 'show'], capture=True)
+ ipv4, _err = subp.subp(['ip', '-4', 'addr', 'show'], capture=True)
nics_with_addresses = set()
for bytes_out in (ipv6, ipv4):
@@ -647,13 +597,13 @@ def _rename_interfaces(renames, strict_present=True, strict_busy=True,
for data in cur_info.values())
def rename(cur, new):
- util.subp(["ip", "link", "set", cur, "name", new], capture=True)
+ subp.subp(["ip", "link", "set", cur, "name", new], capture=True)
def down(name):
- util.subp(["ip", "link", "set", name, "down"], capture=True)
+ subp.subp(["ip", "link", "set", name, "down"], capture=True)
def up(name):
- util.subp(["ip", "link", "set", name, "up"], capture=True)
+ subp.subp(["ip", "link", "set", name, "up"], capture=True)
ops = []
errors = []
@@ -799,23 +749,27 @@ def get_ib_interface_hwaddr(ifname, ethernet_format):
def get_interfaces_by_mac():
if util.is_FreeBSD():
return get_interfaces_by_mac_on_freebsd()
+ elif util.is_NetBSD():
+ return get_interfaces_by_mac_on_netbsd()
+ elif util.is_OpenBSD():
+ return get_interfaces_by_mac_on_openbsd()
else:
return get_interfaces_by_mac_on_linux()
def get_interfaces_by_mac_on_freebsd():
- (out, _) = util.subp(['ifconfig', '-a', 'ether'])
+ (out, _) = subp.subp(['ifconfig', '-a', 'ether'])
# flatten each interface block in a single line
def flatten(out):
curr_block = ''
- for l in out.split('\n'):
- if l.startswith('\t'):
- curr_block += l
+ for line in out.split('\n'):
+ if line.startswith('\t'):
+ curr_block += line
else:
if curr_block:
yield curr_block
- curr_block = l
+ curr_block = line
yield curr_block
# looks for interface and mac in a list of flatten block
@@ -830,6 +784,37 @@ def get_interfaces_by_mac_on_freebsd():
return results
+def get_interfaces_by_mac_on_netbsd():
+ ret = {}
+ re_field_match = (
+ r"(?P<ifname>\w+).*address:\s"
+ r"(?P<mac>([\da-f]{2}[:-]){5}([\da-f]{2})).*"
+ )
+ (out, _) = subp.subp(['ifconfig', '-a'])
+ if_lines = re.sub(r'\n\s+', ' ', out).splitlines()
+ for line in if_lines:
+ m = re.match(re_field_match, line)
+ if m:
+ fields = m.groupdict()
+ ret[fields['mac']] = fields['ifname']
+ return ret
+
+
+def get_interfaces_by_mac_on_openbsd():
+ ret = {}
+ re_field_match = (
+ r"(?P<ifname>\w+).*lladdr\s"
+ r"(?P<mac>([\da-f]{2}[:-]){5}([\da-f]{2})).*")
+ (out, _) = subp.subp(['ifconfig', '-a'])
+ if_lines = re.sub(r'\n\s+', ' ', out).splitlines()
+ for line in if_lines:
+ m = re.match(re_field_match, line)
+ if m:
+ fields = m.groupdict()
+ ret[fields['mac']] = fields['ifname']
+ return ret
+
+
def get_interfaces_by_mac_on_linux():
"""Build a dictionary of tuples {mac: name}.
@@ -917,6 +902,38 @@ def has_url_connectivity(url):
return True
+def is_ip_address(s: str) -> bool:
+ """Returns a bool indicating if ``s`` is an IP address.
+
+ :param s:
+ The string to test.
+
+ :return:
+ A bool indicating if the string contains an IP address or not.
+ """
+ try:
+ ipaddress.ip_address(s)
+ except ValueError:
+ return False
+ return True
+
+
+def is_ipv4_address(s: str) -> bool:
+ """Returns a bool indicating if ``s`` is an IPv4 address.
+
+ :param s:
+ The string to test.
+
+ :return:
+ A bool indicating if the string contains an IPv4 address or not.
+ """
+ try:
+ ipaddress.IPv4Address(s)
+ except ValueError:
+ return False
+ return True
+
+
class EphemeralIPv4Network(object):
"""Context manager which sets up temporary static network configuration.
@@ -950,7 +967,8 @@ class EphemeralIPv4Network(object):
self.prefix = mask_to_net_prefix(prefix_or_mask)
except ValueError as e:
raise ValueError(
- 'Cannot setup network: {0}'.format(e))
+ 'Cannot setup network: {0}'.format(e)
+ ) from e
self.connectivity_url = connectivity_url
self.interface = interface
@@ -990,11 +1008,11 @@ class EphemeralIPv4Network(object):
def __exit__(self, excp_type, excp_value, excp_traceback):
"""Teardown anything we set up."""
for cmd in self.cleanup_cmds:
- util.subp(cmd, capture=True)
+ subp.subp(cmd, capture=True)
def _delete_address(self, address, prefix):
"""Perform the ip command to remove the specified address."""
- util.subp(
+ subp.subp(
['ip', '-family', 'inet', 'addr', 'del',
'%s/%s' % (address, prefix), 'dev', self.interface],
capture=True)
@@ -1006,11 +1024,11 @@ class EphemeralIPv4Network(object):
'Attempting setup of ephemeral network on %s with %s brd %s',
self.interface, cidr, self.broadcast)
try:
- util.subp(
+ subp.subp(
['ip', '-family', 'inet', 'addr', 'add', cidr, 'broadcast',
self.broadcast, 'dev', self.interface],
capture=True, update_env={'LANG': 'C'})
- except util.ProcessExecutionError as e:
+ except subp.ProcessExecutionError as e:
if "File exists" not in e.stderr:
raise
LOG.debug(
@@ -1018,7 +1036,7 @@ class EphemeralIPv4Network(object):
self.interface, self.ip)
else:
# Address creation success, bring up device and queue cleanup
- util.subp(
+ subp.subp(
['ip', '-family', 'inet', 'link', 'set', 'dev', self.interface,
'up'], capture=True)
self.cleanup_cmds.append(
@@ -1035,7 +1053,7 @@ class EphemeralIPv4Network(object):
via_arg = []
if gateway != "0.0.0.0/0":
via_arg = ['via', gateway]
- util.subp(
+ subp.subp(
['ip', '-4', 'route', 'add', net_address] + via_arg +
['dev', self.interface], capture=True)
self.cleanup_cmds.insert(
@@ -1045,20 +1063,20 @@ class EphemeralIPv4Network(object):
def _bringup_router(self):
"""Perform the ip commands to fully setup the router if needed."""
# Check if a default route exists and exit if it does
- out, _ = util.subp(['ip', 'route', 'show', '0.0.0.0/0'], capture=True)
+ out, _ = subp.subp(['ip', 'route', 'show', '0.0.0.0/0'], capture=True)
if 'default' in out:
LOG.debug(
'Skip ephemeral route setup. %s already has default route: %s',
self.interface, out.strip())
return
- util.subp(
+ subp.subp(
['ip', '-4', 'route', 'add', self.router, 'dev', self.interface,
'src', self.ip], capture=True)
self.cleanup_cmds.insert(
0,
['ip', '-4', 'route', 'del', self.router, 'dev', self.interface,
'src', self.ip])
- util.subp(
+ subp.subp(
['ip', '-4', 'route', 'add', 'default', 'via', self.router,
'dev', self.interface], capture=True)
self.cleanup_cmds.insert(
diff --git a/cloudinit/net/bsd.py b/cloudinit/net/bsd.py
new file mode 100644
index 00000000..e34e0454
--- /dev/null
+++ b/cloudinit/net/bsd.py
@@ -0,0 +1,167 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+
+from cloudinit import log as logging
+from cloudinit import net
+from cloudinit import util
+from cloudinit import subp
+from cloudinit.distros.parsers.resolv_conf import ResolvConf
+from cloudinit.distros import bsd_utils
+
+from . import renderer
+
+LOG = logging.getLogger(__name__)
+
+
+class BSDRenderer(renderer.Renderer):
+ resolv_conf_fn = 'etc/resolv.conf'
+ rc_conf_fn = 'etc/rc.conf'
+
+ def get_rc_config_value(self, key):
+ fn = subp.target_path(self.target, self.rc_conf_fn)
+ bsd_utils.get_rc_config_value(key, fn=fn)
+
+ def set_rc_config_value(self, key, value):
+ fn = subp.target_path(self.target, self.rc_conf_fn)
+ bsd_utils.set_rc_config_value(key, value, fn=fn)
+
+ def __init__(self, config=None):
+ if not config:
+ config = {}
+ self.target = None
+ self.interface_configurations = {}
+ self._postcmds = config.get('postcmds', True)
+
+ def _ifconfig_entries(self, settings, target=None):
+ ifname_by_mac = net.get_interfaces_by_mac()
+ for interface in settings.iter_interfaces():
+ device_name = interface.get("name")
+ device_mac = interface.get("mac_address")
+ if device_name and re.match(r'^lo\d+$', device_name):
+ continue
+ if device_mac not in ifname_by_mac:
+ LOG.info('Cannot find any device with MAC %s', device_mac)
+ elif device_mac and device_name:
+ cur_name = ifname_by_mac[device_mac]
+ if cur_name != device_name:
+ LOG.info('netif service will rename interface %s to %s',
+ cur_name, device_name)
+ try:
+ self.rename_interface(cur_name, device_name)
+ except NotImplementedError:
+ LOG.error((
+ 'Interface renaming is '
+ 'not supported on this OS'))
+ device_name = cur_name
+
+ else:
+ device_name = ifname_by_mac[device_mac]
+
+ LOG.info('Configuring interface %s', device_name)
+
+ self.interface_configurations[device_name] = 'DHCP'
+
+ for subnet in interface.get("subnets", []):
+ if subnet.get('type') == 'static':
+ if not subnet.get('netmask'):
+ LOG.debug(
+ 'Skipping IP %s, because there is no netmask',
+ subnet.get('address')
+ )
+ continue
+ LOG.debug('Configuring dev %s with %s / %s', device_name,
+ subnet.get('address'), subnet.get('netmask'))
+
+ self.interface_configurations[device_name] = {
+ 'address': subnet.get('address'),
+ 'netmask': subnet.get('netmask'),
+ }
+
+ def _route_entries(self, settings, target=None):
+ routes = list(settings.iter_routes())
+ for interface in settings.iter_interfaces():
+ subnets = interface.get("subnets", [])
+ for subnet in subnets:
+ if subnet.get('type') != 'static':
+ continue
+ gateway = subnet.get('gateway')
+ if gateway and len(gateway.split('.')) == 4:
+ routes.append({
+ 'network': '0.0.0.0',
+ 'netmask': '0.0.0.0',
+ 'gateway': gateway})
+ routes += subnet.get('routes', [])
+ for route in routes:
+ network = route.get('network')
+ if not network:
+ LOG.debug('Skipping a bad route entry')
+ continue
+ netmask = route.get('netmask')
+ gateway = route.get('gateway')
+ self.set_route(network, netmask, gateway)
+
+ def _resolve_conf(self, settings, target=None):
+ nameservers = settings.dns_nameservers
+ searchdomains = settings.dns_searchdomains
+ for interface in settings.iter_interfaces():
+ for subnet in interface.get("subnets", []):
+ if 'dns_nameservers' in subnet:
+ nameservers.extend(subnet['dns_nameservers'])
+ if 'dns_search' in subnet:
+ searchdomains.extend(subnet['dns_search'])
+ # Try to read the /etc/resolv.conf or just start from scratch if that
+ # fails.
+ try:
+ resolvconf = ResolvConf(util.load_file(subp.target_path(
+ target, self.resolv_conf_fn)))
+ resolvconf.parse()
+ except IOError:
+ util.logexc(LOG, "Failed to parse %s, use new empty file",
+ subp.target_path(target, self.resolv_conf_fn))
+ resolvconf = ResolvConf('')
+ resolvconf.parse()
+
+ # Add some nameservers
+ for server in nameservers:
+ try:
+ resolvconf.add_nameserver(server)
+ except ValueError:
+ util.logexc(LOG, "Failed to add nameserver %s", server)
+
+ # And add any searchdomains.
+ for domain in searchdomains:
+ try:
+ resolvconf.add_search_domain(domain)
+ except ValueError:
+ util.logexc(LOG, "Failed to add search domain %s", domain)
+ util.write_file(
+ subp.target_path(target, self.resolv_conf_fn),
+ str(resolvconf), 0o644)
+
+ def render_network_state(self, network_state, templates=None, target=None):
+ self._ifconfig_entries(settings=network_state)
+ self._route_entries(settings=network_state)
+ self._resolve_conf(settings=network_state)
+
+ self.write_config()
+ self.start_services(run=self._postcmds)
+
+ def dhcp_interfaces(self):
+ ic = self.interface_configurations.items
+ return [k for k, v in ic() if v == 'DHCP']
+
+ def start_services(self, run=False):
+ raise NotImplementedError()
+
+ def write_config(self, target=None):
+ raise NotImplementedError()
+
+ def set_gateway(self, gateway):
+ raise NotImplementedError()
+
+ def rename_interface(self, cur_name, device_name):
+ raise NotImplementedError()
+
+ def set_route(self, network, netmask, gateway):
+ raise NotImplementedError()
diff --git a/cloudinit/net/cmdline.py b/cloudinit/net/cmdline.py
index 64e1c699..cc8dc17b 100755
--- a/cloudinit/net/cmdline.py
+++ b/cloudinit/net/cmdline.py
@@ -10,6 +10,7 @@ import base64
import glob
import gzip
import io
+import logging
import os
from cloudinit import util
@@ -19,21 +20,19 @@ from . import read_sys_net_safe
_OPEN_ISCSI_INTERFACE_FILE = "/run/initramfs/open-iscsi.interface"
+KERNEL_CMDLINE_NETWORK_CONFIG_DISABLED = "disabled"
+
class InitramfsNetworkConfigSource(metaclass=abc.ABCMeta):
"""ABC for net config sources that read config written by initramfses"""
@abc.abstractmethod
- def is_applicable(self):
- # type: () -> bool
+ def is_applicable(self) -> bool:
"""Is this initramfs config source applicable to the current system?"""
- pass
@abc.abstractmethod
- def render_config(self):
- # type: () -> dict
+ def render_config(self) -> dict:
"""Render a v1 network config from the initramfs configuration"""
- pass
class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
@@ -62,8 +61,7 @@ class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
if mac_addr:
self._mac_addrs[k] = mac_addr
- def is_applicable(self):
- # type: () -> bool
+ def is_applicable(self) -> bool:
"""
Return whether this system has klibc initramfs network config or not
@@ -81,8 +79,7 @@ class KlibcNetworkConfigSource(InitramfsNetworkConfigSource):
return True
return False
- def render_config(self):
- # type: () -> dict
+ def render_config(self) -> dict:
return config_from_klibc_net_cfg(
files=self._files, mac_addrs=self._mac_addrs,
)
@@ -115,8 +112,8 @@ def _klibc_to_config_entry(content, mac_addrs=None):
data = util.load_shell_content(content)
try:
name = data['DEVICE'] if 'DEVICE' in data else data['DEVICE6']
- except KeyError:
- raise ValueError("no 'DEVICE' or 'DEVICE6' entry in data")
+ except KeyError as e:
+ raise ValueError("no 'DEVICE' or 'DEVICE6' entry in data") from e
# ipconfig on precise does not write PROTO
# IPv6 config gives us IPV6PROTO, not PROTO.
@@ -233,34 +230,35 @@ def read_initramfs_config():
return None
-def _decomp_gzip(blob, strict=True):
- # decompress blob. raise exception if not compressed unless strict=False.
+def _decomp_gzip(blob):
+ # decompress blob or return original blob
with io.BytesIO(blob) as iobuf:
gzfp = None
try:
gzfp = gzip.GzipFile(mode="rb", fileobj=iobuf)
return gzfp.read()
except IOError:
- if strict:
- raise
return blob
finally:
if gzfp:
gzfp.close()
-def _b64dgz(b64str, gzipped="try"):
- # decode a base64 string. If gzipped is true, transparently uncompresss
- # if gzipped is 'try', then try gunzip, returning the original on fail.
- try:
- blob = base64.b64decode(b64str)
- except TypeError:
- raise ValueError("Invalid base64 text: %s" % b64str)
+def _b64dgz(data):
+ """Decode a string base64 encoding, if gzipped, uncompress as well
- if not gzipped:
- return blob
+ :return: decompressed unencoded string of the data or empty string on
+ unencoded data.
+ """
+ try:
+ blob = base64.b64decode(data)
+ except (TypeError, ValueError):
+ logging.error(
+ "Expected base64 encoded kernel commandline parameter"
+ " network-config. Ignoring network-config=%s.", data)
+ return ''
- return _decomp_gzip(blob, strict=gzipped != "try")
+ return _decomp_gzip(blob)
def read_kernel_cmdline_config(cmdline=None):
@@ -273,6 +271,8 @@ def read_kernel_cmdline_config(cmdline=None):
if tok.startswith("network-config="):
data64 = tok.split("=", 1)[1]
if data64:
+ if data64 == KERNEL_CMDLINE_NETWORK_CONFIG_DISABLED:
+ return {"config": "disabled"}
return util.load_yaml(_b64dgz(data64))
return None
diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py
index 19d0199c..4394c68b 100644
--- a/cloudinit/net/dhcp.py
+++ b/cloudinit/net/dhcp.py
@@ -17,6 +17,7 @@ from cloudinit.net import (
has_url_connectivity)
from cloudinit.net.network_state import mask_and_ipv4_to_bcast_addr as bcip
from cloudinit import temp_utils
+from cloudinit import subp
from cloudinit import util
LOG = logging.getLogger(__name__)
@@ -30,19 +31,18 @@ class InvalidDHCPLeaseFileError(Exception):
Current uses are DataSourceAzure and DataSourceEc2 during ephemeral
boot to scrape metadata.
"""
- pass
class NoDHCPLeaseError(Exception):
"""Raised when unable to get a DHCP lease."""
- pass
class EphemeralDHCPv4(object):
- def __init__(self, iface=None, connectivity_url=None):
+ def __init__(self, iface=None, connectivity_url=None, dhcp_log_func=None):
self.iface = iface
self._ephipv4 = None
self.lease = None
+ self.dhcp_log_func = dhcp_log_func
self.connectivity_url = connectivity_url
def __enter__(self):
@@ -80,9 +80,10 @@ class EphemeralDHCPv4(object):
if self.lease:
return self.lease
try:
- leases = maybe_perform_dhcp_discovery(self.iface)
- except InvalidDHCPLeaseFileError:
- raise NoDHCPLeaseError()
+ leases = maybe_perform_dhcp_discovery(
+ self.iface, self.dhcp_log_func)
+ except InvalidDHCPLeaseFileError as e:
+ raise NoDHCPLeaseError() from e
if not leases:
raise NoDHCPLeaseError()
self.lease = leases[-1]
@@ -130,13 +131,15 @@ class EphemeralDHCPv4(object):
result[internal_mapping] = self.lease.get(different_names)
-def maybe_perform_dhcp_discovery(nic=None):
+def maybe_perform_dhcp_discovery(nic=None, dhcp_log_func=None):
"""Perform dhcp discovery if nic valid and dhclient command exists.
If the nic is invalid or undiscoverable or dhclient command is not found,
skip dhcp_discovery and return an empty dict.
@param nic: Name of the network interface we want to run dhclient on.
+ @param dhcp_log_func: A callable accepting the dhclient output and error
+ streams.
@return: A list of dicts representing dhcp options for each lease obtained
from the dhclient discovery if run, otherwise an empty list is
returned.
@@ -150,7 +153,7 @@ def maybe_perform_dhcp_discovery(nic=None):
LOG.debug(
'Skip dhcp_discovery: nic %s not found in get_devicelist.', nic)
return []
- dhclient_path = util.which('dhclient')
+ dhclient_path = subp.which('dhclient')
if not dhclient_path:
LOG.debug('Skip dhclient configuration: No dhclient command found.')
return []
@@ -158,7 +161,7 @@ def maybe_perform_dhcp_discovery(nic=None):
prefix='cloud-init-dhcp-',
needs_exe=True) as tdir:
# Use /var/tmp because /run/cloud-init/tmp is mounted noexec
- return dhcp_discovery(dhclient_path, nic, tdir)
+ return dhcp_discovery(dhclient_path, nic, tdir, dhcp_log_func)
def parse_dhcp_lease_file(lease_file):
@@ -192,13 +195,15 @@ def parse_dhcp_lease_file(lease_file):
return dhcp_leases
-def dhcp_discovery(dhclient_cmd_path, interface, cleandir):
+def dhcp_discovery(dhclient_cmd_path, interface, cleandir, dhcp_log_func=None):
"""Run dhclient on the interface without scripts or filesystem artifacts.
@param dhclient_cmd_path: Full path to the dhclient used.
@param interface: Name of the network inteface on which to dhclient.
@param cleandir: The directory from which to run dhclient as well as store
dhcp leases.
+ @param dhcp_log_func: A callable accepting the dhclient output and error
+ streams.
@return: A list of dicts of representing the dhcp leases parsed from the
dhcp.leases file or empty list.
@@ -215,14 +220,20 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir):
pid_file = os.path.join(cleandir, 'dhclient.pid')
lease_file = os.path.join(cleandir, 'dhcp.leases')
+ # In some cases files in /var/tmp may not be executable, launching dhclient
+ # from there will certainly raise 'Permission denied' error. Try launching
+ # the original dhclient instead.
+ if not os.access(sandbox_dhclient_cmd, os.X_OK):
+ sandbox_dhclient_cmd = dhclient_cmd_path
+
# ISC dhclient needs the interface up to send initial discovery packets.
# Generally dhclient relies on dhclient-script PREINIT action to bring the
# link up before attempting discovery. Since we are using -sf /bin/true,
# we need to do that "link up" ourselves first.
- util.subp(['ip', 'link', 'set', 'dev', interface, 'up'], capture=True)
+ subp.subp(['ip', 'link', 'set', 'dev', interface, 'up'], capture=True)
cmd = [sandbox_dhclient_cmd, '-1', '-v', '-lf', lease_file,
'-pf', pid_file, interface, '-sf', '/bin/true']
- util.subp(cmd, capture=True)
+ out, err = subp.subp(cmd, capture=True)
# Wait for pid file and lease file to appear, and for the process
# named by the pid file to daemonize (have pid 1 as its parent). If we
@@ -239,6 +250,7 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir):
return []
ppid = 'unknown'
+ daemonized = False
for _ in range(0, 1000):
pid_content = util.load_file(pid_file).strip()
try:
@@ -250,13 +262,17 @@ def dhcp_discovery(dhclient_cmd_path, interface, cleandir):
if ppid == 1:
LOG.debug('killing dhclient with pid=%s', pid)
os.kill(pid, signal.SIGKILL)
- return parse_dhcp_lease_file(lease_file)
+ daemonized = True
+ break
time.sleep(0.01)
- LOG.error(
- 'dhclient(pid=%s, parentpid=%s) failed to daemonize after %s seconds',
- pid_content, ppid, 0.01 * 1000
- )
+ if not daemonized:
+ LOG.error(
+ 'dhclient(pid=%s, parentpid=%s) failed to daemonize after %s '
+ 'seconds', pid_content, ppid, 0.01 * 1000
+ )
+ if dhcp_log_func is not None:
+ dhcp_log_func(out, err)
return parse_dhcp_lease_file(lease_file)
diff --git a/cloudinit/net/eni.py b/cloudinit/net/eni.py
index 2f714563..13c041f3 100644
--- a/cloudinit/net/eni.py
+++ b/cloudinit/net/eni.py
@@ -11,6 +11,7 @@ from . import renderer
from .network_state import subnet_is_ipv6
from cloudinit import log as logging
+from cloudinit import subp
from cloudinit import util
@@ -482,10 +483,8 @@ class Renderer(renderer.Renderer):
if searchdomains:
lo['subnets'][0]["dns_search"] = (" ".join(searchdomains))
- ''' Apply a sort order to ensure that we write out
- the physical interfaces first; this is critical for
- bonding
- '''
+ # Apply a sort order to ensure that we write out the physical
+ # interfaces first; this is critical for bonding
order = {
'loopback': 0,
'physical': 1,
@@ -511,13 +510,13 @@ class Renderer(renderer.Renderer):
return '\n\n'.join(['\n'.join(s) for s in sections]) + "\n"
def render_network_state(self, network_state, templates=None, target=None):
- fpeni = util.target_path(target, self.eni_path)
+ fpeni = subp.target_path(target, self.eni_path)
util.ensure_dir(os.path.dirname(fpeni))
header = self.eni_header if self.eni_header else ""
util.write_file(fpeni, header + self._render_interfaces(network_state))
if self.netrules_path:
- netrules = util.target_path(target, self.netrules_path)
+ netrules = subp.target_path(target, self.netrules_path)
util.ensure_dir(os.path.dirname(netrules))
util.write_file(netrules,
self._render_persistent_net(network_state))
@@ -544,9 +543,9 @@ def available(target=None):
expected = ['ifquery', 'ifup', 'ifdown']
search = ['/sbin', '/usr/sbin']
for p in expected:
- if not util.which(p, search=search, target=target):
+ if not subp.which(p, search=search, target=target):
return False
- eni = util.target_path(target, 'etc/network/interfaces')
+ eni = subp.target_path(target, 'etc/network/interfaces')
if not os.path.isfile(eni):
return False
diff --git a/cloudinit/net/freebsd.py b/cloudinit/net/freebsd.py
index d6f61da3..0285dfec 100644
--- a/cloudinit/net/freebsd.py
+++ b/cloudinit/net/freebsd.py
@@ -1,175 +1,59 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import re
-
from cloudinit import log as logging
-from cloudinit import net
+import cloudinit.net.bsd
+from cloudinit import subp
from cloudinit import util
-from cloudinit.distros import rhel_util
-from cloudinit.distros.parsers.resolv_conf import ResolvConf
-
-from . import renderer
LOG = logging.getLogger(__name__)
-class Renderer(renderer.Renderer):
- resolv_conf_fn = 'etc/resolv.conf'
- rc_conf_fn = 'etc/rc.conf'
+class Renderer(cloudinit.net.bsd.BSDRenderer):
def __init__(self, config=None):
- if not config:
- config = {}
- self.dhcp_interfaces = []
- self._postcmds = config.get('postcmds', True)
-
- def _update_rc_conf(self, settings, target=None):
- fn = util.target_path(target, self.rc_conf_fn)
- rhel_util.update_sysconfig_file(fn, settings)
-
- def _write_ifconfig_entries(self, settings, target=None):
- ifname_by_mac = net.get_interfaces_by_mac()
- for interface in settings.iter_interfaces():
- device_name = interface.get("name")
- device_mac = interface.get("mac_address")
- if device_name and re.match(r'^lo\d+$', device_name):
- continue
- if device_mac not in ifname_by_mac:
- LOG.info('Cannot find any device with MAC %s', device_mac)
- elif device_mac and device_name:
- cur_name = ifname_by_mac[device_mac]
- if cur_name != device_name:
- LOG.info('netif service will rename interface %s to %s',
- cur_name, device_name)
- self._update_rc_conf(
- {'ifconfig_%s_name' % cur_name: device_name},
- target=target)
- else:
- device_name = ifname_by_mac[device_mac]
-
- LOG.info('Configuring interface %s', device_name)
- ifconfig = 'DHCP' # default
-
- for subnet in interface.get("subnets", []):
- if ifconfig != 'DHCP':
- LOG.info('The FreeBSD provider only set the first subnet.')
- break
- if subnet.get('type') == 'static':
- if not subnet.get('netmask'):
- LOG.debug(
- 'Skipping IP %s, because there is no netmask',
- subnet.get('address'))
- continue
- LOG.debug('Configuring dev %s with %s / %s', device_name,
- subnet.get('address'), subnet.get('netmask'))
- # Configure an ipv4 address.
- ifconfig = (
- subnet.get('address') + ' netmask ' +
- subnet.get('netmask'))
-
- if ifconfig == 'DHCP':
- self.dhcp_interfaces.append(device_name)
- self._update_rc_conf(
- {'ifconfig_' + device_name: ifconfig},
- target=target)
-
- def _write_route_entries(self, settings, target=None):
- routes = list(settings.iter_routes())
- for interface in settings.iter_interfaces():
- subnets = interface.get("subnets", [])
- for subnet in subnets:
- if subnet.get('type') != 'static':
- continue
- gateway = subnet.get('gateway')
- if gateway and len(gateway.split('.')) == 4:
- routes.append({
- 'network': '0.0.0.0',
- 'netmask': '0.0.0.0',
- 'gateway': gateway})
- routes += subnet.get('routes', [])
- route_cpt = 0
- for route in routes:
- network = route.get('network')
- if not network:
- LOG.debug('Skipping a bad route entry')
- continue
- netmask = route.get('netmask')
- gateway = route.get('gateway')
- route_cmd = "-route %s/%s %s" % (network, netmask, gateway)
- if network == '0.0.0.0':
- self._update_rc_conf(
- {'defaultrouter': gateway}, target=target)
+ self._route_cpt = 0
+ super(Renderer, self).__init__()
+
+ def rename_interface(self, cur_name, device_name):
+ self.set_rc_config_value('ifconfig_%s_name' % cur_name, device_name)
+
+ def write_config(self):
+ for device_name, v in self.interface_configurations.items():
+ if isinstance(v, dict):
+ self.set_rc_config_value(
+ 'ifconfig_' + device_name,
+ v.get('address') + ' netmask ' + v.get('netmask'))
else:
- self._update_rc_conf(
- {'route_net%d' % route_cpt: route_cmd}, target=target)
- route_cpt += 1
-
- def _write_resolve_conf(self, settings, target=None):
- nameservers = settings.dns_nameservers
- searchdomains = settings.dns_searchdomains
- for interface in settings.iter_interfaces():
- for subnet in interface.get("subnets", []):
- if 'dns_nameservers' in subnet:
- nameservers.extend(subnet['dns_nameservers'])
- if 'dns_search' in subnet:
- searchdomains.extend(subnet['dns_search'])
- # Try to read the /etc/resolv.conf or just start from scratch if that
- # fails.
- try:
- resolvconf = ResolvConf(util.load_file(util.target_path(
- target, self.resolv_conf_fn)))
- resolvconf.parse()
- except IOError:
- util.logexc(LOG, "Failed to parse %s, use new empty file",
- util.target_path(target, self.resolv_conf_fn))
- resolvconf = ResolvConf('')
- resolvconf.parse()
-
- # Add some nameservers
- for server in nameservers:
- try:
- resolvconf.add_nameserver(server)
- except ValueError:
- util.logexc(LOG, "Failed to add nameserver %s", server)
-
- # And add any searchdomains.
- for domain in searchdomains:
- try:
- resolvconf.add_search_domain(domain)
- except ValueError:
- util.logexc(LOG, "Failed to add search domain %s", domain)
- util.write_file(
- util.target_path(target, self.resolv_conf_fn),
- str(resolvconf), 0o644)
-
- def _write_network(self, settings, target=None):
- self._write_ifconfig_entries(settings, target=target)
- self._write_route_entries(settings, target=target)
- self._write_resolve_conf(settings, target=target)
-
- self.start_services(run=self._postcmds)
-
- def render_network_state(self, network_state, templates=None, target=None):
- self._write_network(network_state, target=target)
+ self.set_rc_config_value('ifconfig_' + device_name, 'DHCP')
def start_services(self, run=False):
if not run:
LOG.debug("freebsd generate postcmd disabled")
return
- util.subp(['service', 'netif', 'restart'], capture=True)
+ subp.subp(['service', 'netif', 'restart'], capture=True)
# On FreeBSD 10, the restart of routing and dhclient is likely to fail
# because
# - routing: it cannot remove the loopback route, but it will still set
# up the default route as expected.
# - dhclient: it cannot stop the dhclient started by the netif service.
# In both case, the situation is ok, and we can proceed.
- util.subp(['service', 'routing', 'restart'], capture=True, rcs=[0, 1])
- for dhcp_interface in self.dhcp_interfaces:
- util.subp(['service', 'dhclient', 'restart', dhcp_interface],
+ subp.subp(['service', 'routing', 'restart'], capture=True, rcs=[0, 1])
+
+ for dhcp_interface in self.dhcp_interfaces():
+ subp.subp(['service', 'dhclient', 'restart', dhcp_interface],
rcs=[0, 1],
capture=True)
+ def set_route(self, network, netmask, gateway):
+ if network == '0.0.0.0':
+ self.set_rc_config_value('defaultrouter', gateway)
+ else:
+ route_name = 'route_net%d' % self._route_cpt
+ route_cmd = "-route %s/%s %s" % (network, netmask, gateway)
+ self.set_rc_config_value(route_name, route_cmd)
+ self._route_cpt += 1
+
def available(target=None):
return util.is_FreeBSD()
diff --git a/cloudinit/net/netbsd.py b/cloudinit/net/netbsd.py
new file mode 100644
index 00000000..71b38ee6
--- /dev/null
+++ b/cloudinit/net/netbsd.py
@@ -0,0 +1,44 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import log as logging
+from cloudinit import subp
+from cloudinit import util
+import cloudinit.net.bsd
+
+LOG = logging.getLogger(__name__)
+
+
+class Renderer(cloudinit.net.bsd.BSDRenderer):
+
+ def __init__(self, config=None):
+ super(Renderer, self).__init__()
+
+ def write_config(self):
+ if self.dhcp_interfaces():
+ self.set_rc_config_value('dhcpcd', 'YES')
+ self.set_rc_config_value(
+ 'dhcpcd_flags',
+ ' '.join(self.dhcp_interfaces())
+ )
+ for device_name, v in self.interface_configurations.items():
+ if isinstance(v, dict):
+ self.set_rc_config_value(
+ 'ifconfig_' + device_name,
+ v.get('address') + ' netmask ' + v.get('netmask'))
+
+ def start_services(self, run=False):
+ if not run:
+ LOG.debug("netbsd generate postcmd disabled")
+ return
+
+ subp.subp(['service', 'network', 'restart'], capture=True)
+ if self.dhcp_interfaces():
+ subp.subp(['service', 'dhcpcd', 'restart'], capture=True)
+
+ def set_route(self, network, netmask, gateway):
+ if network == '0.0.0.0':
+ self.set_rc_config_value('defaultroute', gateway)
+
+
+def available(target=None):
+ return util.is_NetBSD()
diff --git a/cloudinit/net/netplan.py b/cloudinit/net/netplan.py
index 89855270..53347c83 100644
--- a/cloudinit/net/netplan.py
+++ b/cloudinit/net/netplan.py
@@ -8,6 +8,7 @@ from .network_state import subnet_is_ipv6, NET_CONFIG_TO_V2, IPV6_DYNAMIC_TYPES
from cloudinit import log as logging
from cloudinit import util
+from cloudinit import subp
from cloudinit import safeyaml
from cloudinit.net import SYS_CLASS_NET, get_devicelist
@@ -164,14 +165,14 @@ def _extract_bond_slaves_by_name(interfaces, entry, bond_master):
def _clean_default(target=None):
# clean out any known default files and derived files in target
# LP: #1675576
- tpath = util.target_path(target, "etc/netplan/00-snapd-config.yaml")
+ tpath = subp.target_path(target, "etc/netplan/00-snapd-config.yaml")
if not os.path.isfile(tpath):
return
content = util.load_file(tpath, decode=False)
if content != KNOWN_SNAPD_CONFIG:
return
- derived = [util.target_path(target, f) for f in (
+ derived = [subp.target_path(target, f) for f in (
'run/systemd/network/10-netplan-all-en.network',
'run/systemd/network/10-netplan-all-eth.network',
'run/systemd/generator/netplan.stamp')]
@@ -203,10 +204,10 @@ class Renderer(renderer.Renderer):
def features(self):
if self._features is None:
try:
- info_blob, _err = util.subp(self.NETPLAN_INFO, capture=True)
+ info_blob, _err = subp.subp(self.NETPLAN_INFO, capture=True)
info = util.load_yaml(info_blob)
self._features = info['netplan.io']['features']
- except util.ProcessExecutionError:
+ except subp.ProcessExecutionError:
# if the info subcommand is not present then we don't have any
# new features
pass
@@ -218,7 +219,7 @@ class Renderer(renderer.Renderer):
# check network state for version
# if v2, then extract network_state.config
# else render_v2_from_state
- fpnplan = os.path.join(util.target_path(target), self.netplan_path)
+ fpnplan = os.path.join(subp.target_path(target), self.netplan_path)
util.ensure_dir(os.path.dirname(fpnplan))
header = self.netplan_header if self.netplan_header else ""
@@ -239,7 +240,7 @@ class Renderer(renderer.Renderer):
if not run:
LOG.debug("netplan generate postcmd disabled")
return
- util.subp(self.NETPLAN_GENERATE, capture=True)
+ subp.subp(self.NETPLAN_GENERATE, capture=True)
def _net_setup_link(self, run=False):
"""To ensure device link properties are applied, we poke
@@ -253,7 +254,7 @@ class Renderer(renderer.Renderer):
for cmd in [setup_lnk + [SYS_CLASS_NET + iface]
for iface in get_devicelist() if
os.path.islink(SYS_CLASS_NET + iface)]:
- util.subp(cmd, capture=True)
+ subp.subp(cmd, capture=True)
def _render_content(self, network_state):
@@ -406,7 +407,7 @@ def available(target=None):
expected = ['netplan']
search = ['/usr/sbin', '/sbin']
for p in expected:
- if not util.which(p, search=search, target=target):
+ if not subp.which(p, search=search, target=target):
return False
return True
diff --git a/cloudinit/net/network_state.py b/cloudinit/net/network_state.py
index 63d6e291..b2f7d31e 100644
--- a/cloudinit/net/network_state.py
+++ b/cloudinit/net/network_state.py
@@ -215,7 +215,7 @@ class NetworkState(object):
return (
route.get('prefix') == 0
and route.get('network') in default_nets
- )
+ )
class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
@@ -297,9 +297,10 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
command_type = command['type']
try:
handler = self.command_handlers[command_type]
- except KeyError:
- raise RuntimeError("No handler found for"
- " command '%s'" % command_type)
+ except KeyError as e:
+ raise RuntimeError(
+ "No handler found for command '%s'" % command_type
+ ) from e
try:
handler(self, command)
except InvalidCommand:
@@ -312,13 +313,14 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
def parse_config_v2(self, skip_broken=True):
for command_type, command in self._config.items():
- if command_type == 'version':
+ if command_type in ['version', 'renderer']:
continue
try:
handler = self.command_handlers[command_type]
- except KeyError:
- raise RuntimeError("No handler found for"
- " command '%s'" % command_type)
+ except KeyError as e:
+ raise RuntimeError(
+ "No handler found for command '%s'" % command_type
+ ) from e
try:
handler(self, command)
self._v2_common(command)
@@ -696,7 +698,7 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
def handle_wifis(self, command):
LOG.warning('Wifi configuration is only available to distros with'
- 'netplan rendering support.')
+ ' netplan rendering support.')
def _v2_common(self, cfg):
LOG.debug('v2_common: handling config:\n%s', cfg)
@@ -722,10 +724,10 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
item_params = dict((key, value) for (key, value) in
item_cfg.items() if key not in
NETWORK_V2_KEY_FILTER)
- # we accept the fixed spelling, but write the old for compatability
+ # we accept the fixed spelling, but write the old for compatibility
# Xenial does not have an updated netplan which supports the
# correct spelling. LP: #1756701
- params = item_params['parameters']
+ params = item_params.get('parameters', {})
grat_value = params.pop('gratuitous-arp', None)
if grat_value:
params['gratuitious-arp'] = grat_value
@@ -734,8 +736,7 @@ class NetworkStateInterpreter(metaclass=CommandHandlerMeta):
'type': cmd_type,
'name': item_name,
cmd_type + '_interfaces': item_cfg.get('interfaces'),
- 'params': dict((v2key_to_v1[k], v) for k, v in
- item_params.get('parameters', {}).items())
+ 'params': dict((v2key_to_v1[k], v) for k, v in params.items())
}
if 'mtu' in item_cfg:
v1_cmd['mtu'] = item_cfg['mtu']
@@ -915,9 +916,10 @@ def _normalize_route(route):
if metric:
try:
normal_route['metric'] = int(metric)
- except ValueError:
+ except ValueError as e:
raise TypeError(
- 'Route config metric {} is not an integer'.format(metric))
+ 'Route config metric {} is not an integer'.format(metric)
+ ) from e
return normal_route
diff --git a/cloudinit/net/openbsd.py b/cloudinit/net/openbsd.py
new file mode 100644
index 00000000..166d77e6
--- /dev/null
+++ b/cloudinit/net/openbsd.py
@@ -0,0 +1,46 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit import log as logging
+from cloudinit import subp
+from cloudinit import util
+import cloudinit.net.bsd
+
+LOG = logging.getLogger(__name__)
+
+
+class Renderer(cloudinit.net.bsd.BSDRenderer):
+
+ def write_config(self):
+ for device_name, v in self.interface_configurations.items():
+ if_file = 'etc/hostname.{}'.format(device_name)
+ fn = subp.target_path(self.target, if_file)
+ if device_name in self.dhcp_interfaces():
+ content = 'dhcp\n'
+ elif isinstance(v, dict):
+ try:
+ content = "inet {address} {netmask}\n".format(
+ address=v['address'],
+ netmask=v['netmask']
+ )
+ except KeyError:
+ LOG.error(
+ "Invalid static configuration for %s",
+ device_name)
+ util.write_file(fn, content)
+
+ def start_services(self, run=False):
+ if not self._postcmds:
+ LOG.debug("openbsd generate postcmd disabled")
+ return
+ subp.subp(['sh', '/etc/netstart'], capture=True)
+
+ def set_route(self, network, netmask, gateway):
+ if network == '0.0.0.0':
+ if_file = 'etc/mygate'
+ fn = subp.target_path(self.target, if_file)
+ content = gateway + '\n'
+ util.write_file(fn, content)
+
+
+def available(target=None):
+ return util.is_OpenBSD()
diff --git a/cloudinit/net/renderers.py b/cloudinit/net/renderers.py
index b98dbbe3..e2de4d55 100644
--- a/cloudinit/net/renderers.py
+++ b/cloudinit/net/renderers.py
@@ -2,18 +2,23 @@
from . import eni
from . import freebsd
+from . import netbsd
from . import netplan
from . import RendererNotFoundError
+from . import openbsd
from . import sysconfig
NAME_TO_RENDERER = {
"eni": eni,
"freebsd": freebsd,
+ "netbsd": netbsd,
"netplan": netplan,
+ "openbsd": openbsd,
"sysconfig": sysconfig,
}
-DEFAULT_PRIORITY = ["eni", "sysconfig", "netplan", "freebsd"]
+DEFAULT_PRIORITY = ["eni", "sysconfig", "netplan", "freebsd",
+ "netbsd", "openbsd"]
def search(priority=None, target=None, first=False):
diff --git a/cloudinit/net/sysconfig.py b/cloudinit/net/sysconfig.py
index 0a387377..0a5d481d 100644
--- a/cloudinit/net/sysconfig.py
+++ b/cloudinit/net/sysconfig.py
@@ -9,6 +9,7 @@ from configobj import ConfigObj
from cloudinit import log as logging
from cloudinit import util
+from cloudinit import subp
from cloudinit.distros.parsers import networkmanager_conf
from cloudinit.distros.parsers import resolv_conf
@@ -504,7 +505,7 @@ class Renderer(renderer.Renderer):
iface_cfg['IPADDR6_%d' % ipv6_index] = ipv6_cidr
else:
iface_cfg['IPV6ADDR_SECONDARIES'] += \
- " " + ipv6_cidr
+ " " + ipv6_cidr
else:
ipv4_index = ipv4_index + 1
suff = "" if ipv4_index == 0 else str(ipv4_index)
@@ -858,19 +859,19 @@ class Renderer(renderer.Renderer):
if not templates:
templates = self.templates
file_mode = 0o644
- base_sysconf_dir = util.target_path(target, self.sysconf_dir)
+ base_sysconf_dir = subp.target_path(target, self.sysconf_dir)
for path, data in self._render_sysconfig(base_sysconf_dir,
network_state, self.flavor,
templates=templates).items():
util.write_file(path, data, file_mode)
if self.dns_path:
- dns_path = util.target_path(target, self.dns_path)
+ dns_path = subp.target_path(target, self.dns_path)
resolv_content = self._render_dns(network_state,
existing_dns_path=dns_path)
if resolv_content:
util.write_file(dns_path, resolv_content, file_mode)
if self.networkmanager_conf_path:
- nm_conf_path = util.target_path(target,
+ nm_conf_path = subp.target_path(target,
self.networkmanager_conf_path)
nm_conf_content = self._render_networkmanager_conf(network_state,
templates)
@@ -878,12 +879,12 @@ class Renderer(renderer.Renderer):
util.write_file(nm_conf_path, nm_conf_content, file_mode)
if self.netrules_path:
netrules_content = self._render_persistent_net(network_state)
- netrules_path = util.target_path(target, self.netrules_path)
+ netrules_path = subp.target_path(target, self.netrules_path)
util.write_file(netrules_path, netrules_content, file_mode)
if available_nm(target=target):
- enable_ifcfg_rh(util.target_path(target, path=NM_CFG_FILE))
+ enable_ifcfg_rh(subp.target_path(target, path=NM_CFG_FILE))
- sysconfig_path = util.target_path(target, templates.get('control'))
+ sysconfig_path = subp.target_path(target, templates.get('control'))
# Distros configuring /etc/sysconfig/network as a file e.g. Centos
if sysconfig_path.endswith('network'):
util.ensure_dir(os.path.dirname(sysconfig_path))
@@ -906,20 +907,20 @@ def available_sysconfig(target=None):
expected = ['ifup', 'ifdown']
search = ['/sbin', '/usr/sbin']
for p in expected:
- if not util.which(p, search=search, target=target):
+ if not subp.which(p, search=search, target=target):
return False
expected_paths = [
'etc/sysconfig/network-scripts/network-functions',
'etc/sysconfig/config']
for p in expected_paths:
- if os.path.isfile(util.target_path(target, p)):
+ if os.path.isfile(subp.target_path(target, p)):
return True
return False
def available_nm(target=None):
- if not os.path.isfile(util.target_path(target, path=NM_CFG_FILE)):
+ if not os.path.isfile(subp.target_path(target, path=NM_CFG_FILE)):
return False
return True
diff --git a/cloudinit/net/tests/test_dhcp.py b/cloudinit/net/tests/test_dhcp.py
index c3fa1e04..74cf4b94 100644
--- a/cloudinit/net/tests/test_dhcp.py
+++ b/cloudinit/net/tests/test_dhcp.py
@@ -62,7 +62,7 @@ class TestParseDHCPLeasesFile(CiTestCase):
{'interface': 'wlp3s0', 'fixed-address': '192.168.2.74',
'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}]
write_file(lease_file, content)
- self.assertItemsEqual(expected, parse_dhcp_lease_file(lease_file))
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
class TestDHCPRFC3442(CiTestCase):
@@ -88,7 +88,7 @@ class TestDHCPRFC3442(CiTestCase):
'renew': '4 2017/07/27 18:02:30',
'expire': '5 2017/07/28 07:08:15'}]
write_file(lease_file, content)
- self.assertItemsEqual(expected, parse_dhcp_lease_file(lease_file))
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
def test_parse_lease_finds_classless_static_routes(self):
"""
@@ -114,7 +114,7 @@ class TestDHCPRFC3442(CiTestCase):
'renew': '4 2017/07/27 18:02:30',
'expire': '5 2017/07/28 07:08:15'}]
write_file(lease_file, content)
- self.assertItemsEqual(expected, parse_dhcp_lease_file(lease_file))
+ self.assertCountEqual(expected, parse_dhcp_lease_file(lease_file))
@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
@@ -211,7 +211,7 @@ class TestDHCPParseStaticRoutes(CiTestCase):
"class_b": "16,172,16,10",
"class_a": "8,10,10",
"gateway": "0,0",
- "netlen": "33,0",
+ "netlen": "33,0",
}
for rfc3442 in bad_rfc3442.values():
self.assertEqual([], parse_static_routes(rfc3442))
@@ -266,7 +266,7 @@ class TestDHCPDiscoveryClean(CiTestCase):
'Skip dhcp_discovery: nic idontexist not found in get_devicelist.',
self.logs.getvalue())
- @mock.patch('cloudinit.net.dhcp.util.which')
+ @mock.patch('cloudinit.net.dhcp.subp.which')
@mock.patch('cloudinit.net.dhcp.find_fallback_nic')
def test_absent_dhclient_command(self, m_fallback, m_which):
"""When dhclient doesn't exist in the OS, log the issue and no-op."""
@@ -279,7 +279,7 @@ class TestDHCPDiscoveryClean(CiTestCase):
@mock.patch('cloudinit.temp_utils.os.getuid')
@mock.patch('cloudinit.net.dhcp.dhcp_discovery')
- @mock.patch('cloudinit.net.dhcp.util.which')
+ @mock.patch('cloudinit.net.dhcp.subp.which')
@mock.patch('cloudinit.net.dhcp.find_fallback_nic')
def test_dhclient_run_with_tmpdir(self, m_fback, m_which, m_dhcp, m_uid):
"""maybe_perform_dhcp_discovery passes tmpdir to dhcp_discovery."""
@@ -302,13 +302,14 @@ class TestDHCPDiscoveryClean(CiTestCase):
@mock.patch('time.sleep', mock.MagicMock())
@mock.patch('cloudinit.net.dhcp.os.kill')
- @mock.patch('cloudinit.net.dhcp.util.subp')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
def test_dhcp_discovery_run_in_sandbox_warns_invalid_pid(self, m_subp,
m_kill):
"""dhcp_discovery logs a warning when pidfile contains invalid content.
Lease processing still occurs and no proc kill is attempted.
"""
+ m_subp.return_value = ('', '')
tmpdir = self.tmp_dir()
dhclient_script = os.path.join(tmpdir, 'dhclient.orig')
script_content = '#!/bin/bash\necho fake-dhclient'
@@ -324,7 +325,7 @@ class TestDHCPDiscoveryClean(CiTestCase):
""")
write_file(self.tmp_path('dhcp.leases', tmpdir), lease_content)
- self.assertItemsEqual(
+ self.assertCountEqual(
[{'interface': 'eth9', 'fixed-address': '192.168.2.74',
'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}],
dhcp_discovery(dhclient_script, 'eth9', tmpdir))
@@ -337,13 +338,14 @@ class TestDHCPDiscoveryClean(CiTestCase):
@mock.patch('cloudinit.net.dhcp.util.get_proc_ppid')
@mock.patch('cloudinit.net.dhcp.os.kill')
@mock.patch('cloudinit.net.dhcp.util.wait_for_files')
- @mock.patch('cloudinit.net.dhcp.util.subp')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
def test_dhcp_discovery_run_in_sandbox_waits_on_lease_and_pid(self,
m_subp,
m_wait,
m_kill,
m_getppid):
"""dhcp_discovery waits for the presence of pidfile and dhcp.leases."""
+ m_subp.return_value = ('', '')
tmpdir = self.tmp_dir()
dhclient_script = os.path.join(tmpdir, 'dhclient.orig')
script_content = '#!/bin/bash\necho fake-dhclient'
@@ -364,12 +366,13 @@ class TestDHCPDiscoveryClean(CiTestCase):
@mock.patch('cloudinit.net.dhcp.util.get_proc_ppid')
@mock.patch('cloudinit.net.dhcp.os.kill')
- @mock.patch('cloudinit.net.dhcp.util.subp')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
def test_dhcp_discovery_run_in_sandbox(self, m_subp, m_kill, m_getppid):
"""dhcp_discovery brings up the interface and runs dhclient.
It also returns the parsed dhcp.leases file generated in the sandbox.
"""
+ m_subp.return_value = ('', '')
tmpdir = self.tmp_dir()
dhclient_script = os.path.join(tmpdir, 'dhclient.orig')
script_content = '#!/bin/bash\necho fake-dhclient'
@@ -389,7 +392,7 @@ class TestDHCPDiscoveryClean(CiTestCase):
write_file(pid_file, "%d\n" % my_pid)
m_getppid.return_value = 1 # Indicate that dhclient has daemonized
- self.assertItemsEqual(
+ self.assertCountEqual(
[{'interface': 'eth9', 'fixed-address': '192.168.2.74',
'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}],
dhcp_discovery(dhclient_script, 'eth9', tmpdir))
@@ -406,6 +409,87 @@ class TestDHCPDiscoveryClean(CiTestCase):
'eth9', '-sf', '/bin/true'], capture=True)])
m_kill.assert_has_calls([mock.call(my_pid, signal.SIGKILL)])
+ @mock.patch('cloudinit.net.dhcp.util.get_proc_ppid')
+ @mock.patch('cloudinit.net.dhcp.os.kill')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
+ def test_dhcp_discovery_outside_sandbox(self, m_subp, m_kill, m_getppid):
+ """dhcp_discovery brings up the interface and runs dhclient.
+
+ It also returns the parsed dhcp.leases file generated in the sandbox.
+ """
+ m_subp.return_value = ('', '')
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, 'dhclient.orig')
+ script_content = '#!/bin/bash\necho fake-dhclient'
+ write_file(dhclient_script, script_content, mode=0o755)
+ lease_content = dedent("""
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """)
+ lease_file = os.path.join(tmpdir, 'dhcp.leases')
+ write_file(lease_file, lease_content)
+ pid_file = os.path.join(tmpdir, 'dhclient.pid')
+ my_pid = 1
+ write_file(pid_file, "%d\n" % my_pid)
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+
+ with mock.patch('os.access', return_value=False):
+ self.assertCountEqual(
+ [{'interface': 'eth9', 'fixed-address': '192.168.2.74',
+ 'subnet-mask': '255.255.255.0', 'routers': '192.168.2.1'}],
+ dhcp_discovery(dhclient_script, 'eth9', tmpdir))
+ # dhclient script got copied
+ with open(os.path.join(tmpdir, 'dhclient.orig')) as stream:
+ self.assertEqual(script_content, stream.read())
+ # Interface was brought up before dhclient called from sandbox
+ m_subp.assert_has_calls([
+ mock.call(
+ ['ip', 'link', 'set', 'dev', 'eth9', 'up'], capture=True),
+ mock.call(
+ [os.path.join(tmpdir, 'dhclient.orig'), '-1', '-v', '-lf',
+ lease_file, '-pf', os.path.join(tmpdir, 'dhclient.pid'),
+ 'eth9', '-sf', '/bin/true'], capture=True)])
+ m_kill.assert_has_calls([mock.call(my_pid, signal.SIGKILL)])
+
+ @mock.patch('cloudinit.net.dhcp.util.get_proc_ppid')
+ @mock.patch('cloudinit.net.dhcp.os.kill')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
+ def test_dhcp_output_error_stream(self, m_subp, m_kill, m_getppid):
+ """"dhcp_log_func is called with the output and error streams of
+ dhclinet when the callable is passed."""
+ dhclient_err = 'FAKE DHCLIENT ERROR'
+ dhclient_out = 'FAKE DHCLIENT OUT'
+ m_subp.return_value = (dhclient_out, dhclient_err)
+ tmpdir = self.tmp_dir()
+ dhclient_script = os.path.join(tmpdir, 'dhclient.orig')
+ script_content = '#!/bin/bash\necho fake-dhclient'
+ write_file(dhclient_script, script_content, mode=0o755)
+ lease_content = dedent("""
+ lease {
+ interface "eth9";
+ fixed-address 192.168.2.74;
+ option subnet-mask 255.255.255.0;
+ option routers 192.168.2.1;
+ }
+ """)
+ lease_file = os.path.join(tmpdir, 'dhcp.leases')
+ write_file(lease_file, lease_content)
+ pid_file = os.path.join(tmpdir, 'dhclient.pid')
+ my_pid = 1
+ write_file(pid_file, "%d\n" % my_pid)
+ m_getppid.return_value = 1 # Indicate that dhclient has daemonized
+
+ def dhcp_log_func(out, err):
+ self.assertEqual(out, dhclient_out)
+ self.assertEqual(err, dhclient_err)
+
+ dhcp_discovery(
+ dhclient_script, 'eth9', tmpdir, dhcp_log_func=dhcp_log_func)
+
class TestSystemdParseLeases(CiTestCase):
@@ -529,7 +613,7 @@ class TestEphemeralDhcpNoNetworkSetup(HttprettyTestCase):
# Ensure that no teardown happens:
m_dhcp.assert_not_called()
- @mock.patch('cloudinit.net.dhcp.util.subp')
+ @mock.patch('cloudinit.net.dhcp.subp.subp')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
def test_ephemeral_dhcp_setup_network_if_url_connectivity(
self, m_dhcp, m_subp):
diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py
index 5081a337..311ab6f8 100644
--- a/cloudinit/net/tests/test_init.py
+++ b/cloudinit/net/tests/test_init.py
@@ -2,16 +2,20 @@
import copy
import errno
-import httpretty
+import ipaddress
import os
-import requests
import textwrap
from unittest import mock
+import httpretty
+import pytest
+import requests
+
import cloudinit.net as net
-from cloudinit.util import ensure_file, write_file, ProcessExecutionError
-from cloudinit.tests.helpers import CiTestCase, HttprettyTestCase
from cloudinit import safeyaml as yaml
+from cloudinit.tests.helpers import CiTestCase, HttprettyTestCase
+from cloudinit.subp import ProcessExecutionError
+from cloudinit.util import ensure_file, write_file
class TestSysDevPath(CiTestCase):
@@ -139,12 +143,6 @@ class TestReadSysNet(CiTestCase):
write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), state)
self.assertFalse(net.is_up('eth0'))
- def test_is_wireless(self):
- """is_wireless is True when /sys/net/devname/wireless exists."""
- self.assertFalse(net.is_wireless('eth0'))
- ensure_file(os.path.join(self.sysdir, 'eth0', 'wireless'))
- self.assertTrue(net.is_wireless('eth0'))
-
def test_is_bridge(self):
"""is_bridge is True when /sys/net/devname/bridge exists."""
self.assertFalse(net.is_bridge('eth0'))
@@ -200,32 +198,6 @@ class TestReadSysNet(CiTestCase):
write_file(os.path.join(self.sysdir, 'eth0', 'uevent'), content)
self.assertTrue(net.is_vlan('eth0'))
- def test_is_connected_when_physically_connected(self):
- """is_connected is True when /sys/net/devname/iflink reports 2."""
- self.assertFalse(net.is_connected('eth0'))
- write_file(os.path.join(self.sysdir, 'eth0', 'iflink'), "2")
- self.assertTrue(net.is_connected('eth0'))
-
- def test_is_connected_when_wireless_and_carrier_active(self):
- """is_connected is True if wireless /sys/net/devname/carrier is 1."""
- self.assertFalse(net.is_connected('eth0'))
- ensure_file(os.path.join(self.sysdir, 'eth0', 'wireless'))
- self.assertFalse(net.is_connected('eth0'))
- write_file(os.path.join(self.sysdir, 'eth0', 'carrier'), "1")
- self.assertTrue(net.is_connected('eth0'))
-
- def test_is_physical(self):
- """is_physical is True when /sys/net/devname/device exists."""
- self.assertFalse(net.is_physical('eth0'))
- ensure_file(os.path.join(self.sysdir, 'eth0', 'device'))
- self.assertTrue(net.is_physical('eth0'))
-
- def test_is_present(self):
- """is_present is True when /sys/net/devname exists."""
- self.assertFalse(net.is_present('eth0'))
- ensure_file(os.path.join(self.sysdir, 'eth0', 'device'))
- self.assertTrue(net.is_present('eth0'))
-
class TestGenerateFallbackConfig(CiTestCase):
@@ -341,8 +313,6 @@ class TestGenerateFallbackConfig(CiTestCase):
class TestNetFindFallBackNic(CiTestCase):
- with_logs = True
-
def setUp(self):
super(TestNetFindFallBackNic, self).setUp()
sys_mock = mock.patch('cloudinit.net.get_sys_class_path')
@@ -396,7 +366,7 @@ class TestGetDeviceList(CiTestCase):
"""get_devicelist returns a directory listing for SYS_CLASS_NET."""
write_file(os.path.join(self.sysdir, 'eth0', 'operstate'), 'up')
write_file(os.path.join(self.sysdir, 'eth1', 'operstate'), 'up')
- self.assertItemsEqual(['eth0', 'eth1'], net.get_devicelist())
+ self.assertCountEqual(['eth0', 'eth1'], net.get_devicelist())
class TestGetInterfaceMAC(CiTestCase):
@@ -540,7 +510,7 @@ class TestInterfaceHasOwnMAC(CiTestCase):
net.interface_has_own_mac('eth1', strict=True)
-@mock.patch('cloudinit.net.util.subp')
+@mock.patch('cloudinit.net.subp.subp')
class TestEphemeralIPV4Network(CiTestCase):
with_logs = True
@@ -993,86 +963,8 @@ class TestExtractPhysdevs(CiTestCase):
net.extract_physdevs({'version': 3, 'awesome_config': []})
-class TestWaitForPhysdevs(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestWaitForPhysdevs, self).setUp()
- self.add_patch('cloudinit.net.get_interfaces_by_mac',
- 'm_get_iface_mac')
- self.add_patch('cloudinit.util.udevadm_settle', 'm_udev_settle')
-
- def test_wait_for_physdevs_skips_settle_if_all_present(self):
- physdevs = [
- ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
- ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
- ]
- netcfg = {
- 'version': 2,
- 'ethernets': {args[1]: _mk_v2_phys(*args)
- for args in physdevs},
- }
- self.m_get_iface_mac.side_effect = iter([
- {'aa:bb:cc:dd:ee:ff': 'eth0',
- '00:11:22:33:44:55': 'ens3'},
- ])
- net.wait_for_physdevs(netcfg)
- self.assertEqual(0, self.m_udev_settle.call_count)
-
- def test_wait_for_physdevs_calls_udev_settle_on_missing(self):
- physdevs = [
- ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
- ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
- ]
- netcfg = {
- 'version': 2,
- 'ethernets': {args[1]: _mk_v2_phys(*args)
- for args in physdevs},
- }
- self.m_get_iface_mac.side_effect = iter([
- {'aa:bb:cc:dd:ee:ff': 'eth0'}, # first call ens3 is missing
- {'aa:bb:cc:dd:ee:ff': 'eth0',
- '00:11:22:33:44:55': 'ens3'}, # second call has both
- ])
- net.wait_for_physdevs(netcfg)
- self.m_udev_settle.assert_called_with(exists=net.sys_dev_path('ens3'))
-
- def test_wait_for_physdevs_raise_runtime_error_if_missing_and_strict(self):
- physdevs = [
- ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
- ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
- ]
- netcfg = {
- 'version': 2,
- 'ethernets': {args[1]: _mk_v2_phys(*args)
- for args in physdevs},
- }
- self.m_get_iface_mac.return_value = {}
- with self.assertRaises(RuntimeError):
- net.wait_for_physdevs(netcfg)
-
- self.assertEqual(5 * len(physdevs), self.m_udev_settle.call_count)
-
- def test_wait_for_physdevs_no_raise_if_not_strict(self):
- physdevs = [
- ['aa:bb:cc:dd:ee:ff', 'eth0', 'virtio', '0x1000'],
- ['00:11:22:33:44:55', 'ens3', 'e1000', '0x1643'],
- ]
- netcfg = {
- 'version': 2,
- 'ethernets': {args[1]: _mk_v2_phys(*args)
- for args in physdevs},
- }
- self.m_get_iface_mac.return_value = {}
- net.wait_for_physdevs(netcfg, strict=False)
- self.assertEqual(5 * len(physdevs), self.m_udev_settle.call_count)
-
-
class TestNetFailOver(CiTestCase):
- with_logs = True
-
def setUp(self):
super(TestNetFailOver, self).setUp()
self.add_patch('cloudinit.net.util', 'm_util')
@@ -1297,4 +1189,48 @@ class TestNetFailOver(CiTestCase):
m_standby.return_value = False
self.assertFalse(net.is_netfailover(devname, driver))
+
+class TestIsIpAddress:
+ """Tests for net.is_ip_address.
+
+ Instead of testing with values we rely on the ipaddress stdlib module to
+ handle all values correctly, so simply test that is_ip_address defers to
+ the ipaddress module correctly.
+ """
+
+ @pytest.mark.parametrize('ip_address_side_effect,expected_return', (
+ (ValueError, False),
+ (lambda _: ipaddress.IPv4Address('192.168.0.1'), True),
+ (lambda _: ipaddress.IPv6Address('2001:db8::'), True),
+ ))
+ def test_is_ip_address(self, ip_address_side_effect, expected_return):
+ with mock.patch('cloudinit.net.ipaddress.ip_address',
+ side_effect=ip_address_side_effect) as m_ip_address:
+ ret = net.is_ip_address(mock.sentinel.ip_address_in)
+ assert expected_return == ret
+ expected_call = mock.call(mock.sentinel.ip_address_in)
+ assert [expected_call] == m_ip_address.call_args_list
+
+
+class TestIsIpv4Address:
+ """Tests for net.is_ipv4_address.
+
+ Instead of testing with values we rely on the ipaddress stdlib module to
+ handle all values correctly, so simply test that is_ipv4_address defers to
+ the ipaddress module correctly.
+ """
+
+ @pytest.mark.parametrize('ipv4address_mock,expected_return', (
+ (mock.Mock(side_effect=ValueError), False),
+ (mock.Mock(return_value=ipaddress.IPv4Address('192.168.0.1')), True),
+ ))
+ def test_is_ip_address(self, ipv4address_mock, expected_return):
+ with mock.patch('cloudinit.net.ipaddress.IPv4Address',
+ ipv4address_mock) as m_ipv4address:
+ ret = net.is_ipv4_address(mock.sentinel.ip_address_in)
+ assert expected_return == ret
+ expected_call = mock.call(mock.sentinel.ip_address_in)
+ assert [expected_call] == m_ipv4address.call_args_list
+
+
# vi: ts=4 expandtab
diff --git a/cloudinit/net/tests/test_network_state.py b/cloudinit/net/tests/test_network_state.py
index 55880852..07d726e2 100644
--- a/cloudinit/net/tests/test_network_state.py
+++ b/cloudinit/net/tests/test_network_state.py
@@ -45,4 +45,14 @@ class TestNetworkStateParseConfig(CiTestCase):
self.assertNotEqual(None, result)
+class TestNetworkStateParseConfigV2(CiTestCase):
+
+ def test_version_2_ignores_renderer_key(self):
+ ncfg = {'version': 2, 'renderer': 'networkd', 'ethernets': {}}
+ nsi = network_state.NetworkStateInterpreter(version=ncfg['version'],
+ config=ncfg)
+ nsi.parse_config(skip_broken=False)
+ self.assertEqual(ncfg, nsi.as_dict()['config'])
+
+
# vi: ts=4 expandtab